From 5f9326b1318d83f49d3e7483a6c80e29656759af Mon Sep 17 00:00:00 2001 From: Rebeccah Date: Tue, 31 May 2022 14:23:48 -0400 Subject: [PATCH] added average event processing metric (in seconds) that can be served to grafana via prometheus. This metric is a good indicator of how far behind the callback receiver is. The higher the load the further behind/the greater the number of seconds the metric will display. This number being high may indicate the need for horizontal scaling in the control plane or vertically scaling the number of callback receivers. --- awx/main/analytics/subsystem_metrics.py | 13 ++++++++----- awx/main/dispatch/worker/callback.py | 13 ++++++++++++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index a08211b311..c1ad08a0cb 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -34,7 +34,7 @@ class BaseM: def to_prometheus(self, instance_data): output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n" for instance in instance_data: - output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n' + output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance].get(self.field, -1)}\n' # TODO: fix because this -1 is neccessary when dealing with old instances (ex. you didn't clean up your database) return output_text @@ -161,7 +161,7 @@ class Metrics: IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'), IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'), IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'), - FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'), + FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'), IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'), IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'), HistogramM( @@ -170,6 +170,7 @@ class Metrics: FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), + SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} @@ -179,9 +180,11 @@ class Metrics: # track last time metrics were sent to other nodes self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call') - def clear_values(self): - for m in self.METRICS.values(): - m.clear_value(self.conn) + def clear_values(self, fields=None): + if not fields: + fields = self.METRICS.keys() + for m in fields: + self.METRICS[m].clear_value(self.conn) self.metrics_have_changed = True self.conn.delete(root_key + "_lock") diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 9df77a68c8..b54c3e7399 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -4,6 +4,7 @@ import os import signal import time import traceback +import datetime from django.conf import settings from django.utils.functional import cached_property @@ -151,12 +152,17 @@ class CallbackBrokerWorker(BaseWorker): metrics_singular_events_saved = 0 metrics_events_batch_save_errors = 0 metrics_events_broadcast = 0 + metrics_events_missing_created = 0 + metrics_total_job_event_processing_seconds = datetime.timedelta(seconds=0) for cls, events in self.buff.items(): logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})') for e in events: + e.modified = now # this can be set before created because now is set above on line 149 if not e.created: e.created = now - e.modified = now + metrics_events_missing_created += 1 + else: # only calculate the seconds if the created time already has been set + metrics_total_job_event_processing_seconds += e.modified - e.created metrics_duration_to_save = time.perf_counter() try: cls.objects.bulk_create(events) @@ -189,6 +195,11 @@ class CallbackBrokerWorker(BaseWorker): self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', metrics_bulk_events_saved) self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(metrics_bulk_events_saved + metrics_singular_events_saved)) self.subsystem_metrics.inc('callback_receiver_events_broadcast', metrics_events_broadcast) + self.subsystem_metrics.set( + 'callback_receiver_event_processing_avg_seconds', + metrics_total_job_event_processing_seconds.total_seconds() + / (metrics_bulk_events_saved + metrics_singular_events_saved - metrics_events_missing_created), + ) if self.subsystem_metrics.should_pipe_execute() is True: self.subsystem_metrics.pipe_execute()