From 0c569c67fd81c5593eeba4a5e1c6acf5b65106d6 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 23 Mar 2021 16:05:10 -0400 Subject: [PATCH] Add subsystem metrics - Adds a Metrics() class that can track data such as number of events the callback receiver inserted into database - Exposes this metric data at the api/v2/metrics/ endpoint. This data is prometheus-friendly - Metric data is stored in memory, then periodically saved to Redis. - Metric data is periodically broadcast to other nodes in the cluster, so that each node has a copy of the most recent metric data collected. --- awx/api/renderers.py | 14 +- awx/api/templates/api/metrics_view.md | 1 + awx/api/views/metrics.py | 8 +- awx/main/analytics/analytics_tasks.py | 14 + awx/main/analytics/subsystem_metrics.py | 304 ++++++++++++++++++ awx/main/consumers.py | 1 - awx/main/dispatch/worker/callback.py | 39 ++- awx/main/queue.py | 3 +- awx/main/tasks.py | 2 + .../functional/analytics/test_metrics.py | 26 +- awx/main/wsbroadcast.py | 8 +- awx/settings/defaults.py | 10 + 12 files changed, 410 insertions(+), 20 deletions(-) create mode 100644 awx/api/templates/api/metrics_view.md create mode 100644 awx/main/analytics/analytics_tasks.py create mode 100644 awx/main/analytics/subsystem_metrics.py diff --git a/awx/api/renderers.py b/awx/api/renderers.py index 48cba6cf5c..d19d6ee318 100644 --- a/awx/api/renderers.py +++ b/awx/api/renderers.py @@ -129,6 +129,18 @@ class PrometheusJSONRenderer(renderers.JSONRenderer): parsed_metrics = text_string_to_metric_families(data) data = {} for family in parsed_metrics: + data[family.name] = {} + data[family.name]['help_text'] = family.documentation + data[family.name]['type'] = family.type + data[family.name]['samples'] = [] for sample in family.samples: - data[sample[0]] = {"labels": sample[1], "value": sample[2]} + sample_dict = {"labels": sample[1], "value": sample[2]} + if family.type == 'histogram': + if sample[0].endswith("_sum"): + sample_dict['sample_type'] = "sum" + elif sample[0].endswith("_count"): + sample_dict['sample_type'] = "count" + elif sample[0].endswith("_bucket"): + sample_dict['sample_type'] = "bucket" + data[family.name]['samples'].append(sample_dict) return super(PrometheusJSONRenderer, self).render(data, accepted_media_type, renderer_context) diff --git a/awx/api/templates/api/metrics_view.md b/awx/api/templates/api/metrics_view.md new file mode 100644 index 0000000000..dbc4d2e043 --- /dev/null +++ b/awx/api/templates/api/metrics_view.md @@ -0,0 +1 @@ +query params to filter response, e.g., ?subsystemonly=1&metric=callback_receiver_events_insert_db&node=awx-1 diff --git a/awx/api/views/metrics.py b/awx/api/views/metrics.py index dd40f11900..212acf3890 100644 --- a/awx/api/views/metrics.py +++ b/awx/api/views/metrics.py @@ -14,6 +14,7 @@ from rest_framework.exceptions import PermissionDenied # AWX # from awx.main.analytics import collectors +import awx.main.analytics.subsystem_metrics as s_metrics from awx.main.analytics.metrics import metrics from awx.api import renderers @@ -33,5 +34,10 @@ class MetricsView(APIView): def get(self, request): ''' Show Metrics Details ''' if request.user.is_superuser or request.user.is_system_auditor: - return Response(metrics().decode('UTF-8')) + metrics_to_show = '' + if not request.query_params.get('subsystemonly', "0") == "1": + metrics_to_show += metrics().decode('UTF-8') + if not request.query_params.get('dbonly', "0") == "1": + metrics_to_show += s_metrics.metrics(request) + return Response(metrics_to_show) raise PermissionDenied() diff --git a/awx/main/analytics/analytics_tasks.py b/awx/main/analytics/analytics_tasks.py new file mode 100644 index 0000000000..990cacfafb --- /dev/null +++ b/awx/main/analytics/analytics_tasks.py @@ -0,0 +1,14 @@ +# Python +import logging + +# AWX +from awx.main.analytics.subsystem_metrics import Metrics +from awx.main.dispatch.publish import task +from awx.main.dispatch import get_local_queuename + +logger = logging.getLogger('awx.main.scheduler') + + +@task(queue=get_local_queuename) +def send_subsystem_metrics(): + Metrics().send_metrics() diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py new file mode 100644 index 0000000000..b5ecf39e90 --- /dev/null +++ b/awx/main/analytics/subsystem_metrics.py @@ -0,0 +1,304 @@ +import redis +import json +import time +import logging + +from django.conf import settings +from django.apps import apps +from awx.main.consumers import emit_channel_notification + +root_key = 'awx_metrics' +logger = logging.getLogger('awx.main.wsbroadcast') + + +class BaseM: + def __init__(self, field, help_text): + self.field = field + self.help_text = help_text + self.current_value = 0 + + def clear_value(self, conn): + conn.hset(root_key, self.field, 0) + self.current_value = 0 + + def inc(self, value): + self.current_value += value + + def set(self, value): + self.current_value = value + + def decode(self, conn): + value = conn.hget(root_key, self.field) + return self.decode_value(value) + + def to_prometheus(self, instance_data): + output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n" + for instance in instance_data: + output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n' + return output_text + + +class FloatM(BaseM): + def decode_value(self, value): + if value is not None: + return float(value) + else: + return 0.0 + + def store_value(self, conn): + conn.hincrbyfloat(root_key, self.field, self.current_value) + self.current_value = 0 + + +class IntM(BaseM): + def decode_value(self, value): + if value is not None: + return int(value) + else: + return 0 + + def store_value(self, conn): + conn.hincrby(root_key, self.field, self.current_value) + self.current_value = 0 + + +class SetIntM(BaseM): + def decode_value(self, value): + if value is not None: + return int(value) + else: + return 0 + + def store_value(self, conn): + # do not set value if it has not changed since last time this was called + if self.current_value is not None: + conn.hset(root_key, self.field, self.current_value) + self.current_value = None + + +class SetFloatM(SetIntM): + def decode_value(self, value): + if value is not None: + return float(value) + else: + return 0 + + +class HistogramM(BaseM): + def __init__(self, field, help_text, buckets): + self.buckets = buckets + self.buckets_to_keys = {} + for b in buckets: + self.buckets_to_keys[b] = IntM(field + '_' + str(b), '') + self.inf = IntM(field + '_inf', '') + self.sum = IntM(field + '_sum', '') + super(HistogramM, self).__init__(field, help_text) + + def clear_value(self, conn): + conn.hset(root_key, self.field, 0) + self.inf.clear_value(conn) + self.sum.clear_value(conn) + for b in self.buckets_to_keys.values(): + b.clear_value(conn) + super(HistogramM, self).clear_value(conn) + + def observe(self, value): + for b in self.buckets: + if value <= b: + self.buckets_to_keys[b].inc(1) + break + self.sum.inc(value) + self.inf.inc(1) + + def decode(self, conn): + values = {'counts': []} + for b in self.buckets_to_keys: + values['counts'].append(self.buckets_to_keys[b].decode(conn)) + values['sum'] = self.sum.decode(conn) + values['inf'] = self.inf.decode(conn) + return values + + def store_value(self, conn): + for b in self.buckets: + self.buckets_to_keys[b].store_value(conn) + self.sum.store_value(conn) + self.inf.store_value(conn) + + def to_prometheus(self, instance_data): + output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} histogram\n" + for instance in instance_data: + for i, b in enumerate(self.buckets): + output_text += f'{self.field}_bucket{{le="{b}",node="{instance}"}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n' + output_text += f'{self.field}_bucket{{le="+Inf",node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n' + output_text += f'{self.field}_count{{node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n' + output_text += f'{self.field}_sum{{node="{instance}"}} {instance_data[instance][self.field]["sum"]}\n' + return output_text + + +class Metrics: + def __init__(self, auto_pipe_execute=True): + self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline() + self.conn = redis.Redis.from_url(settings.BROKER_URL) + self.last_pipe_execute = time.time() + # track if metrics have been modified since last saved to redis + # start with True so that we get an initial save to redis + self.metrics_have_changed = True + self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS + self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS + # auto pipe execute will commit transaction of metric data to redis + # at a regular interval (pipe_execute_interval). If set to False, + # the calling function should call .pipe_execute() explicitly + self.auto_pipe_execute = auto_pipe_execute + Instance = apps.get_model('main', 'Instance') + self.instance_name = Instance.objects.me().hostname + + # metric name, help_text + METRICSLIST = [ + SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'), + IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'), + IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'), + IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'), + FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'), + IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'), + HistogramM( + 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS + ), + FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), + IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), + FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), + ] + # turn metric list into dictionary with the metric name as a key + self.METRICS = {} + for m in METRICSLIST: + self.METRICS[m.field] = m + + # track last time metrics were sent to other nodes + self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call') + + def clear_values(self): + for m in self.METRICS.values(): + m.clear_value(self.conn) + self.metrics_have_changed = True + self.conn.delete(root_key + "_lock") + + def inc(self, field, value): + if value != 0: + self.METRICS[field].inc(value) + self.metrics_have_changed = True + if self.auto_pipe_execute is True and self.should_pipe_execute() is True: + self.pipe_execute() + + def set(self, field, value): + self.METRICS[field].set(value) + self.metrics_have_changed = True + if self.auto_pipe_execute is True and self.should_pipe_execute() is True: + self.pipe_execute() + + def observe(self, field, value): + self.METRICS[field].observe(value) + self.metrics_have_changed = True + if self.auto_pipe_execute is True and self.should_pipe_execute() is True: + self.pipe_execute() + + def serialize_local_metrics(self): + data = self.load_local_metrics() + return json.dumps(data) + + def load_local_metrics(self): + # generate python dictionary of key values from metrics stored in redis + data = {} + for field in self.METRICS: + data[field] = self.METRICS[field].decode(self.conn) + return data + + def store_metrics(self, data_json): + # called when receiving metrics from other instances + data = json.loads(data_json) + if self.instance_name != data['instance']: + logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}") + self.conn.set(root_key + "_instance_" + data['instance'], data['metrics']) + + def should_pipe_execute(self): + if self.metrics_have_changed is False: + return False + if time.time() - self.last_pipe_execute > self.pipe_execute_interval: + return True + else: + return False + + def pipe_execute(self): + if self.metrics_have_changed is True: + duration_to_save = time.perf_counter() + for m in self.METRICS: + self.METRICS[m].store_value(self.pipe) + self.pipe.execute() + self.last_pipe_execute = time.time() + self.metrics_have_changed = False + duration_to_save = time.perf_counter() - duration_to_save + self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_to_save) + self.METRICS['subsystem_metrics_pipe_execute_calls'].inc(1) + + duration_to_save = time.perf_counter() + self.send_metrics() + duration_to_save = time.perf_counter() - duration_to_save + self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_to_save) + + def send_metrics(self): + # more than one thread could be calling this at the same time, so should + # get acquire redis lock before sending metrics + lock = self.conn.lock(root_key + '_lock', thread_local=False) + if not lock.acquire(blocking=False): + return + try: + current_time = time.time() + if current_time - self.previous_send_metrics.decode(self.conn) > self.send_metrics_interval: + payload = { + 'instance': self.instance_name, + 'metrics': self.serialize_local_metrics(), + } + # store a local copy as well + self.store_metrics(json.dumps(payload)) + emit_channel_notification("metrics", payload) + self.previous_send_metrics.set(current_time) + self.previous_send_metrics.store_value(self.conn) + finally: + lock.release() + + def load_other_metrics(self, request): + # data received from other nodes are stored in their own keys + # e.g., awx_metrics_instance_awx-1, awx_metrics_instance_awx-2 + # this method looks for keys with "_instance_" in the name and loads the data + # also filters data based on request query params + # if additional filtering is added, update metrics_view.md + instances_filter = request.query_params.getlist("node") + # get a sorted list of instance names + instance_names = [self.instance_name] + for m in self.conn.scan_iter(root_key + '_instance_*'): + instance_names.append(m.decode('UTF-8').split('_instance_')[1]) + instance_names.sort() + # load data, including data from the this local instance + instance_data = {} + for instance in instance_names: + if len(instances_filter) == 0 or instance in instances_filter: + instance_data_from_redis = self.conn.get(root_key + '_instance_' + instance) + # data from other instances may not be available. That is OK. + if instance_data_from_redis: + instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8')) + return instance_data + + def generate_metrics(self, request): + # takes the api request, filters, and generates prometheus data + # if additional filtering is added, update metrics_view.md + instance_data = self.load_other_metrics(request) + metrics_filter = request.query_params.getlist("metric") + output_text = '' + if instance_data: + for field in self.METRICS: + if len(metrics_filter) == 0 or field in metrics_filter: + output_text += self.METRICS[field].to_prometheus(instance_data) + return output_text + + +def metrics(request): + m = Metrics() + return m.generate_metrics(request) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index a2425ec337..21ebe9d771 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -13,7 +13,6 @@ from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.layers import get_channel_layer from channels.db import database_sync_to_async - logger = logging.getLogger('awx.main.consumers') XRF_KEY = '_auth_user_xrf' diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 68b8d5fd4f..acfb0bce02 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -20,7 +20,7 @@ from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, Inv from awx.main.tasks import handle_success_and_failure_notifications from awx.main.models.events import emit_event_detail from awx.main.utils.profiling import AWXProfiler - +import awx.main.analytics.subsystem_metrics as s_metrics from .base import BaseWorker logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -46,16 +46,22 @@ class CallbackBrokerWorker(BaseWorker): self.buff = {} self.pid = os.getpid() self.redis = redis.Redis.from_url(settings.BROKER_URL) + self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.queue_pop = 0 + self.queue_name = settings.CALLBACK_QUEUE self.prof = AWXProfiler("CallbackBrokerWorker") for key in self.redis.keys('awx_callback_receiver_statistics_*'): self.redis.delete(key) def read(self, queue): try: - res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=1) + res = self.redis.blpop(self.queue_name, timeout=1) if res is None: return {'event': 'FLUSH'} self.total += 1 + self.queue_pop += 1 + self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1) + self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1) return json.loads(res[1]) except redis.exceptions.RedisError: logger.exception("encountered an error communicating with redis") @@ -64,8 +70,19 @@ class CallbackBrokerWorker(BaseWorker): logger.exception("failed to decode JSON message from redis") finally: self.record_statistics() + self.record_read_metrics() + return {'event': 'FLUSH'} + def record_read_metrics(self): + if self.queue_pop == 0: + return + if self.subsystem_metrics.should_pipe_execute() is True: + queue_size = self.redis.llen(self.queue_name) + self.subsystem_metrics.set('callback_receiver_events_queue_size_redis', queue_size) + self.subsystem_metrics.pipe_execute() + self.queue_pop = 0 + def record_statistics(self): # buffer stat recording to once per (by default) 5s if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL: @@ -99,27 +116,44 @@ class CallbackBrokerWorker(BaseWorker): def flush(self, force=False): now = tz_now() if force or (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]): + bulk_events_saved = 0 + singular_events_saved = 0 + metrics_events_batch_save_errors = 0 for cls, events in self.buff.items(): logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})') for e in events: if not e.created: e.created = now e.modified = now + duration_to_save = time.perf_counter() try: cls.objects.bulk_create(events) + bulk_events_saved += len(events) except Exception: # if an exception occurs, we should re-attempt to save the # events one-by-one, because something in the list is # broken/stale + metrics_events_batch_save_errors += 1 for e in events: try: e.save() + singular_events_saved += 1 except Exception: logger.exception('Database Error Saving Job Event') + duration_to_save = time.perf_counter() - duration_to_save for e in events: emit_event_detail(e) self.buff = {} self.last_flush = time.time() + # only update metrics if we saved events + if (bulk_events_saved + singular_events_saved) > 0: + self.subsystem_metrics.inc('callback_receiver_batch_events_errors', metrics_events_batch_save_errors) + self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', duration_to_save) + self.subsystem_metrics.inc('callback_receiver_events_insert_db', bulk_events_saved + singular_events_saved) + self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', bulk_events_saved) + self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(bulk_events_saved + singular_events_saved)) + if self.subsystem_metrics.should_pipe_execute() is True: + self.subsystem_metrics.pipe_execute() def perform_work(self, body): try: @@ -169,6 +203,7 @@ class CallbackBrokerWorker(BaseWorker): except Exception: logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) finally: + self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1) GuidMiddleware.set_guid('') return diff --git a/awx/main/queue.py b/awx/main/queue.py index 88fc2c8288..ebac0622e4 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -8,7 +8,7 @@ import redis # Django from django.conf import settings - +import awx.main.analytics.subsystem_metrics as s_metrics __all__ = ['CallbackQueueDispatcher'] @@ -28,6 +28,7 @@ class CallbackQueueDispatcher(object): self.queue = getattr(settings, 'CALLBACK_QUEUE', '') self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') self.connection = redis.Redis.from_url(settings.BROKER_URL) + self.subsystem_metrics = s_metrics.Metrics() def dispatch(self, obj): self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder)) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index de1b15377b..7945b8e275 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -107,6 +107,7 @@ from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry from awx.conf.license import get_license +from awx.main.analytics.subsystem_metrics import Metrics from rest_framework.exceptions import PermissionDenied @@ -170,6 +171,7 @@ def dispatch_startup(): cluster_node_heartbeat() if Instance.objects.me().is_controller(): awx_isolated_heartbeat() + Metrics().clear_values() # Update Tower's rsyslog.conf file based on loggins settings in the db reconfigure_rsyslog() diff --git a/awx/main/tests/functional/analytics/test_metrics.py b/awx/main/tests/functional/analytics/test_metrics.py index 94076d1362..442c83699c 100644 --- a/awx/main/tests/functional/analytics/test_metrics.py +++ b/awx/main/tests/functional/analytics/test_metrics.py @@ -56,24 +56,28 @@ def test_metrics_counts(organization_factory, job_template_factory, workflow_job assert EXPECTED_VALUES[name] == value +def get_metrics_view_db_only(): + return reverse('api:metrics_view') + '?dbonly=1' + + @pytest.mark.django_db def test_metrics_permissions(get, admin, org_admin, alice, bob, organization): - assert get(reverse('api:metrics_view'), user=admin).status_code == 200 - assert get(reverse('api:metrics_view'), user=org_admin).status_code == 403 - assert get(reverse('api:metrics_view'), user=alice).status_code == 403 - assert get(reverse('api:metrics_view'), user=bob).status_code == 403 + assert get(get_metrics_view_db_only(), user=admin).status_code == 200 + assert get(get_metrics_view_db_only(), user=org_admin).status_code == 403 + assert get(get_metrics_view_db_only(), user=alice).status_code == 403 + assert get(get_metrics_view_db_only(), user=bob).status_code == 403 organization.auditor_role.members.add(bob) - assert get(reverse('api:metrics_view'), user=bob).status_code == 403 + assert get(get_metrics_view_db_only(), user=bob).status_code == 403 Role.singleton('system_auditor').members.add(bob) bob.is_system_auditor = True - assert get(reverse('api:metrics_view'), user=bob).status_code == 200 + assert get(get_metrics_view_db_only(), user=bob).status_code == 200 @pytest.mark.django_db def test_metrics_http_methods(get, post, patch, put, options, admin): - assert get(reverse('api:metrics_view'), user=admin).status_code == 200 - assert put(reverse('api:metrics_view'), user=admin).status_code == 405 - assert patch(reverse('api:metrics_view'), user=admin).status_code == 405 - assert post(reverse('api:metrics_view'), user=admin).status_code == 405 - assert options(reverse('api:metrics_view'), user=admin).status_code == 200 + assert get(get_metrics_view_db_only(), user=admin).status_code == 200 + assert put(get_metrics_view_db_only(), user=admin).status_code == 405 + assert patch(get_metrics_view_db_only(), user=admin).status_code == 405 + assert post(get_metrics_view_db_only(), user=admin).status_code == 405 + assert options(get_metrics_view_db_only(), user=admin).status_code == 200 diff --git a/awx/main/wsbroadcast.py b/awx/main/wsbroadcast.py index e2ee9fc431..184ae06122 100644 --- a/awx/main/wsbroadcast.py +++ b/awx/main/wsbroadcast.py @@ -15,7 +15,7 @@ from awx.main.analytics.broadcast_websocket import ( BroadcastWebsocketStats, BroadcastWebsocketStatsManager, ) - +import awx.main.analytics.subsystem_metrics as s_metrics logger = logging.getLogger('awx.main.wsbroadcast') @@ -68,6 +68,7 @@ class WebsocketTask: self.protocol = protocol self.verify_ssl = verify_ssl self.channel_layer = None + self.subsystem_metrics = s_metrics.Metrics() async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): raise RuntimeError("Implement me") @@ -144,9 +145,10 @@ class BroadcastWebsocketTask(WebsocketTask): logmsg = "{} {}".format(logmsg, payload) logger.warn(logmsg) continue - (group, message) = unwrap_broadcast_msg(payload) - + if group == "metrics": + self.subsystem_metrics.store_metrics(message) + continue await self.channel_layer.group_send(group, {"type": "internal.message", "text": message}) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e51f66007d..2daa33d4b3 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -224,6 +224,15 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000 # The number of job events to migrate per-transaction when moving from int -> bigint JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000 +# Histogram buckets for the callback_receiver_batch_events_insert_db metric +SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000] + +# Interval in seconds for sending local metrics to other nodes +SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS = 3 + +# Interval in seconds for saving local metrics to redis +SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS = 2 + # The maximum allowed jobs to start on a given task manager cycle START_TASK_LIMIT = 100 @@ -427,6 +436,7 @@ CELERYBEAT_SCHEDULE = { 'gather_analytics': {'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5)}, 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, + 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, # 'isolated_heartbeat': set up at the end of production.py and development.py }