Merge pull request #12308 from rebeccahhh/job_event_lag

Metrics for callback receiver job event lag
This commit is contained in:
Rebeccah Hunter 2022-06-07 11:50:17 -04:00 committed by GitHub
commit 76ffdbb993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 6 deletions

View File

@ -34,7 +34,7 @@ class BaseM:
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance].get(self.field, -1)}\n' # TODO: fix because this -1 is neccessary when dealing with old instances (ex. you didn't clean up your database)
return output_text
@ -161,7 +161,7 @@ class Metrics:
IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'),
FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'),
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
HistogramM(
@ -170,6 +170,7 @@ class Metrics:
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
@ -179,9 +180,11 @@ class Metrics:
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')
def clear_values(self):
for m in self.METRICS.values():
m.clear_value(self.conn)
def clear_values(self, fields=None):
if not fields:
fields = self.METRICS.keys()
for m in fields:
self.METRICS[m].clear_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")

View File

@ -4,6 +4,7 @@ import os
import signal
import time
import traceback
import datetime
from django.conf import settings
from django.utils.functional import cached_property
@ -151,12 +152,17 @@ class CallbackBrokerWorker(BaseWorker):
metrics_singular_events_saved = 0
metrics_events_batch_save_errors = 0
metrics_events_broadcast = 0
metrics_events_missing_created = 0
metrics_total_job_event_processing_seconds = datetime.timedelta(seconds=0)
for cls, events in self.buff.items():
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
for e in events:
e.modified = now # this can be set before created because now is set above on line 149
if not e.created:
e.created = now
e.modified = now
metrics_events_missing_created += 1
else: # only calculate the seconds if the created time already has been set
metrics_total_job_event_processing_seconds += e.modified - e.created
metrics_duration_to_save = time.perf_counter()
try:
cls.objects.bulk_create(events)
@ -189,6 +195,11 @@ class CallbackBrokerWorker(BaseWorker):
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', metrics_bulk_events_saved)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(metrics_bulk_events_saved + metrics_singular_events_saved))
self.subsystem_metrics.inc('callback_receiver_events_broadcast', metrics_events_broadcast)
self.subsystem_metrics.set(
'callback_receiver_event_processing_avg_seconds',
metrics_total_job_event_processing_seconds.total_seconds()
/ (metrics_bulk_events_saved + metrics_singular_events_saved - metrics_events_missing_created),
)
if self.subsystem_metrics.should_pipe_execute() is True:
self.subsystem_metrics.pipe_execute()