diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index b5ecf39e90..2e4c8b41c7 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -160,6 +160,7 @@ class Metrics: IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'), FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'), IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'), + IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'), HistogramM( 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS ), diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index ccf3b063d1..a88286364a 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -116,19 +116,20 @@ class CallbackBrokerWorker(BaseWorker): def flush(self, force=False): now = tz_now() if force or (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]): - bulk_events_saved = 0 - singular_events_saved = 0 + metrics_bulk_events_saved = 0 + metrics_singular_events_saved = 0 metrics_events_batch_save_errors = 0 + metrics_events_broadcast = 0 for cls, events in self.buff.items(): logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})') for e in events: if not e.created: e.created = now e.modified = now - duration_to_save = time.perf_counter() + metrics_duration_to_save = time.perf_counter() try: cls.objects.bulk_create(events) - bulk_events_saved += len(events) + metrics_bulk_events_saved += len(events) except Exception: # if an exception occurs, we should re-attempt to save the # events one-by-one, because something in the list is @@ -137,22 +138,24 @@ class CallbackBrokerWorker(BaseWorker): for e in events: try: e.save() - singular_events_saved += 1 + metrics_singular_events_saved += 1 except Exception: logger.exception('Database Error Saving Job Event') - duration_to_save = time.perf_counter() - duration_to_save + metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save for e in events: if not getattr(e, '_skip_websocket_message', False): + metrics_events_broadcast += 1 emit_event_detail(e) self.buff = {} self.last_flush = time.time() # only update metrics if we saved events - if (bulk_events_saved + singular_events_saved) > 0: + if (metrics_bulk_events_saved + metrics_singular_events_saved) > 0: self.subsystem_metrics.inc('callback_receiver_batch_events_errors', metrics_events_batch_save_errors) - self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', duration_to_save) - self.subsystem_metrics.inc('callback_receiver_events_insert_db', bulk_events_saved + singular_events_saved) - self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', bulk_events_saved) - self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(bulk_events_saved + singular_events_saved)) + self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', metrics_duration_to_save) + self.subsystem_metrics.inc('callback_receiver_events_insert_db', metrics_bulk_events_saved + metrics_singular_events_saved) + self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', metrics_bulk_events_saved) + self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(metrics_bulk_events_saved + metrics_singular_events_saved)) + self.subsystem_metrics.inc('callback_receiver_events_broadcast', metrics_events_broadcast) if self.subsystem_metrics.should_pipe_execute() is True: self.subsystem_metrics.pipe_execute()