mirror of
https://github.com/ansible/awx.git
synced 2026-03-11 06:29:31 -02:30
Per-service metrics http server
* Organize metrics into their respective service * Server per-service metrics on a per-service http server * Increase prometheus client usage over our custom metrics fields
This commit is contained in:
committed by
Chris Meyers
parent
6dcaa09dfb
commit
8a902debd5
@@ -2,7 +2,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.analytics.subsystem_metrics import Metrics
|
from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics
|
||||||
from awx.main.dispatch.publish import task
|
from awx.main.dispatch.publish import task
|
||||||
from awx.main.dispatch import get_task_queuename
|
from awx.main.dispatch import get_task_queuename
|
||||||
|
|
||||||
@@ -11,4 +11,5 @@ logger = logging.getLogger('awx.main.scheduler')
|
|||||||
|
|
||||||
@task(queue=get_task_queuename)
|
@task(queue=get_task_queuename)
|
||||||
def send_subsystem_metrics():
|
def send_subsystem_metrics():
|
||||||
Metrics().send_metrics()
|
DispatcherMetrics().send_metrics()
|
||||||
|
CallbackReceiverMetrics().send_metrics()
|
||||||
|
|||||||
@@ -1,10 +1,15 @@
|
|||||||
|
import itertools
|
||||||
import redis
|
import redis
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
import prometheus_client
|
||||||
|
from prometheus_client.core import GaugeMetricFamily, HistogramMetricFamily
|
||||||
|
from prometheus_client.registry import CollectorRegistry
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.apps import apps
|
from django.http import HttpRequest
|
||||||
|
from rest_framework.request import Request
|
||||||
|
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main.utils import is_testing
|
from awx.main.utils import is_testing
|
||||||
@@ -13,6 +18,30 @@ root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
|
|||||||
logger = logging.getLogger('awx.main.analytics')
|
logger = logging.getLogger('awx.main.analytics')
|
||||||
|
|
||||||
|
|
||||||
|
class MetricsNamespace:
|
||||||
|
def __init__(self, namespace):
|
||||||
|
self._namespace = namespace
|
||||||
|
|
||||||
|
|
||||||
|
class MetricsServerSettings(MetricsNamespace):
|
||||||
|
def port(self):
|
||||||
|
return settings.METRICS_SUBSYSTEM_CONFIG['server'][self._namespace]['port']
|
||||||
|
|
||||||
|
|
||||||
|
class MetricsServer(MetricsServerSettings):
|
||||||
|
def __init__(self, namespace, registry):
|
||||||
|
MetricsNamespace.__init__(self, namespace)
|
||||||
|
self._registry = registry
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
try:
|
||||||
|
# TODO: addr for ipv6 ?
|
||||||
|
prometheus_client.start_http_server(self.port(), addr='localhost', registry=self._registry)
|
||||||
|
except Exception:
|
||||||
|
logger.error(f"MetricsServer failed to start for service '{self._namespace}.")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
class BaseM:
|
class BaseM:
|
||||||
def __init__(self, field, help_text):
|
def __init__(self, field, help_text):
|
||||||
self.field = field
|
self.field = field
|
||||||
@@ -148,76 +177,40 @@ class HistogramM(BaseM):
|
|||||||
return output_text
|
return output_text
|
||||||
|
|
||||||
|
|
||||||
class Metrics:
|
class Metrics(MetricsNamespace):
|
||||||
def __init__(self, auto_pipe_execute=False, instance_name=None):
|
# metric name, help_text
|
||||||
|
METRICSLIST = []
|
||||||
|
_METRICSLIST = [
|
||||||
|
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
|
||||||
|
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
|
||||||
|
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, namespace, auto_pipe_execute=False, instance_name=None, metrics_have_changed=True, **kwargs):
|
||||||
|
MetricsNamespace.__init__(self, namespace)
|
||||||
|
|
||||||
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
|
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
|
||||||
self.conn = redis.Redis.from_url(settings.BROKER_URL)
|
self.conn = redis.Redis.from_url(settings.BROKER_URL)
|
||||||
self.last_pipe_execute = time.time()
|
self.last_pipe_execute = time.time()
|
||||||
# track if metrics have been modified since last saved to redis
|
# track if metrics have been modified since last saved to redis
|
||||||
# start with True so that we get an initial save to redis
|
# start with True so that we get an initial save to redis
|
||||||
self.metrics_have_changed = True
|
self.metrics_have_changed = metrics_have_changed
|
||||||
self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS
|
self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS
|
||||||
self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS
|
self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS
|
||||||
# auto pipe execute will commit transaction of metric data to redis
|
# auto pipe execute will commit transaction of metric data to redis
|
||||||
# at a regular interval (pipe_execute_interval). If set to False,
|
# at a regular interval (pipe_execute_interval). If set to False,
|
||||||
# the calling function should call .pipe_execute() explicitly
|
# the calling function should call .pipe_execute() explicitly
|
||||||
self.auto_pipe_execute = auto_pipe_execute
|
self.auto_pipe_execute = auto_pipe_execute
|
||||||
Instance = apps.get_model('main', 'Instance')
|
|
||||||
if instance_name:
|
if instance_name:
|
||||||
self.instance_name = instance_name
|
self.instance_name = instance_name
|
||||||
elif is_testing():
|
elif is_testing():
|
||||||
self.instance_name = "awx_testing"
|
self.instance_name = "awx_testing"
|
||||||
else:
|
else:
|
||||||
self.instance_name = Instance.objects.my_hostname()
|
self.instance_name = settings.CLUSTER_HOST_ID # Same as Instance.objects.my_hostname() BUT we do not need to import Instance
|
||||||
|
|
||||||
# metric name, help_text
|
|
||||||
METRICSLIST = [
|
|
||||||
SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'),
|
|
||||||
IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
|
|
||||||
IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
|
|
||||||
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
|
|
||||||
FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'),
|
|
||||||
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
|
|
||||||
IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
|
|
||||||
HistogramM(
|
|
||||||
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
|
|
||||||
),
|
|
||||||
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
|
|
||||||
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
|
|
||||||
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
|
|
||||||
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
|
|
||||||
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
|
|
||||||
SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
|
|
||||||
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
|
|
||||||
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
|
|
||||||
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
|
||||||
IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
|
|
||||||
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
|
||||||
SetIntM('task_manager_tasks_started', 'Number of tasks started'),
|
|
||||||
SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
|
|
||||||
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
|
|
||||||
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
|
|
||||||
SetFloatM('task_manager_commit_seconds', 'Time spent in db transaction, including on_commit calls'),
|
|
||||||
SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'),
|
|
||||||
SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
|
|
||||||
SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
|
||||||
IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
|
|
||||||
SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
|
||||||
SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
|
|
||||||
SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
|
||||||
IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
|
|
||||||
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
|
||||||
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
|
|
||||||
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
|
|
||||||
# dispatcher subsystem metrics
|
|
||||||
SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'),
|
|
||||||
SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'),
|
|
||||||
SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'),
|
|
||||||
SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'),
|
|
||||||
]
|
|
||||||
# turn metric list into dictionary with the metric name as a key
|
# turn metric list into dictionary with the metric name as a key
|
||||||
self.METRICS = {}
|
self.METRICS = {}
|
||||||
for m in METRICSLIST:
|
for m in itertools.chain(self.METRICSLIST, self._METRICSLIST):
|
||||||
self.METRICS[m.field] = m
|
self.METRICS[m.field] = m
|
||||||
|
|
||||||
# track last time metrics were sent to other nodes
|
# track last time metrics were sent to other nodes
|
||||||
@@ -230,7 +223,7 @@ class Metrics:
|
|||||||
m.reset_value(self.conn)
|
m.reset_value(self.conn)
|
||||||
self.metrics_have_changed = True
|
self.metrics_have_changed = True
|
||||||
self.conn.delete(root_key + "_lock")
|
self.conn.delete(root_key + "_lock")
|
||||||
for m in self.conn.scan_iter(root_key + '_instance_*'):
|
for m in self.conn.scan_iter(root_key + '-' + self._namespace + '_instance_*'):
|
||||||
self.conn.delete(m)
|
self.conn.delete(m)
|
||||||
|
|
||||||
def inc(self, field, value):
|
def inc(self, field, value):
|
||||||
@@ -297,7 +290,7 @@ class Metrics:
|
|||||||
def send_metrics(self):
|
def send_metrics(self):
|
||||||
# more than one thread could be calling this at the same time, so should
|
# more than one thread could be calling this at the same time, so should
|
||||||
# acquire redis lock before sending metrics
|
# acquire redis lock before sending metrics
|
||||||
lock = self.conn.lock(root_key + '_lock')
|
lock = self.conn.lock(root_key + '-' + self._namespace + '_lock')
|
||||||
if not lock.acquire(blocking=False):
|
if not lock.acquire(blocking=False):
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
@@ -307,9 +300,10 @@ class Metrics:
|
|||||||
payload = {
|
payload = {
|
||||||
'instance': self.instance_name,
|
'instance': self.instance_name,
|
||||||
'metrics': serialized_metrics,
|
'metrics': serialized_metrics,
|
||||||
|
'metrics_namespace': self._namespace,
|
||||||
}
|
}
|
||||||
# store the serialized data locally as well, so that load_other_metrics will read it
|
# store the serialized data locally as well, so that load_other_metrics will read it
|
||||||
self.conn.set(root_key + '_instance_' + self.instance_name, serialized_metrics)
|
self.conn.set(root_key + '-' + self._namespace + '_instance_' + self.instance_name, serialized_metrics)
|
||||||
emit_channel_notification("metrics", payload)
|
emit_channel_notification("metrics", payload)
|
||||||
|
|
||||||
self.previous_send_metrics.set(current_time)
|
self.previous_send_metrics.set(current_time)
|
||||||
@@ -331,14 +325,14 @@ class Metrics:
|
|||||||
instances_filter = request.query_params.getlist("node")
|
instances_filter = request.query_params.getlist("node")
|
||||||
# get a sorted list of instance names
|
# get a sorted list of instance names
|
||||||
instance_names = [self.instance_name]
|
instance_names = [self.instance_name]
|
||||||
for m in self.conn.scan_iter(root_key + '_instance_*'):
|
for m in self.conn.scan_iter(root_key + '-' + self._namespace + '_instance_*'):
|
||||||
instance_names.append(m.decode('UTF-8').split('_instance_')[1])
|
instance_names.append(m.decode('UTF-8').split('_instance_')[1])
|
||||||
instance_names.sort()
|
instance_names.sort()
|
||||||
# load data, including data from the this local instance
|
# load data, including data from the this local instance
|
||||||
instance_data = {}
|
instance_data = {}
|
||||||
for instance in instance_names:
|
for instance in instance_names:
|
||||||
if len(instances_filter) == 0 or instance in instances_filter:
|
if len(instances_filter) == 0 or instance in instances_filter:
|
||||||
instance_data_from_redis = self.conn.get(root_key + '_instance_' + instance)
|
instance_data_from_redis = self.conn.get(root_key + '-' + self._namespace + '_instance_' + instance)
|
||||||
# data from other instances may not be available. That is OK.
|
# data from other instances may not be available. That is OK.
|
||||||
if instance_data_from_redis:
|
if instance_data_from_redis:
|
||||||
instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8'))
|
instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8'))
|
||||||
@@ -357,6 +351,120 @@ class Metrics:
|
|||||||
return output_text
|
return output_text
|
||||||
|
|
||||||
|
|
||||||
|
class DispatcherMetrics(Metrics):
|
||||||
|
METRICSLIST = [
|
||||||
|
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
|
||||||
|
SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
|
||||||
|
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
|
||||||
|
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
|
||||||
|
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
||||||
|
IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
|
||||||
|
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||||
|
SetIntM('task_manager_tasks_started', 'Number of tasks started'),
|
||||||
|
SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
|
||||||
|
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
|
||||||
|
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
|
||||||
|
SetFloatM('task_manager_commit_seconds', 'Time spent in db transaction, including on_commit calls'),
|
||||||
|
SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'),
|
||||||
|
SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
|
||||||
|
SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
||||||
|
IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
|
||||||
|
SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||||
|
SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
|
||||||
|
SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
||||||
|
IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
|
||||||
|
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||||
|
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
|
||||||
|
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
|
||||||
|
# dispatcher subsystem metrics
|
||||||
|
SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'),
|
||||||
|
SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'),
|
||||||
|
SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'),
|
||||||
|
SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(settings.METRICS_SERVICE_DISPATCHER, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class CallbackReceiverMetrics(Metrics):
|
||||||
|
METRICSLIST = [
|
||||||
|
SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'),
|
||||||
|
IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
|
||||||
|
IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
|
||||||
|
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
|
||||||
|
FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'),
|
||||||
|
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
|
||||||
|
IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
|
||||||
|
HistogramM(
|
||||||
|
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
|
||||||
|
),
|
||||||
|
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def metrics(request):
|
def metrics(request):
|
||||||
m = Metrics()
|
output_text = ''
|
||||||
return m.generate_metrics(request)
|
for m in [DispatcherMetrics(), CallbackReceiverMetrics()]:
|
||||||
|
output_text += m.generate_metrics(request)
|
||||||
|
return output_text
|
||||||
|
|
||||||
|
|
||||||
|
class CustomToPrometheusMetricsCollector(prometheus_client.registry.Collector):
|
||||||
|
"""
|
||||||
|
Takes the metric data from redis -> our custom metric fields -> prometheus
|
||||||
|
library metric fields.
|
||||||
|
|
||||||
|
The plan is to get rid of the use of redis, our custom metric fields, and
|
||||||
|
to switch fully to the prometheus library. At that point, this translation
|
||||||
|
code will be deleted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, metrics_obj, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._metrics = metrics_obj
|
||||||
|
|
||||||
|
def collect(self):
|
||||||
|
my_hostname = settings.CLUSTER_HOST_ID
|
||||||
|
|
||||||
|
instance_data = self._metrics.load_other_metrics(Request(HttpRequest()))
|
||||||
|
if not instance_data:
|
||||||
|
logger.debug(f"No metric data not found in redis for metric namespace '{self._metrics._namespace}'")
|
||||||
|
return None
|
||||||
|
|
||||||
|
host_metrics = instance_data.get(my_hostname)
|
||||||
|
for _, metric in self._metrics.METRICS.items():
|
||||||
|
entry = host_metrics.get(metric.field)
|
||||||
|
if not entry:
|
||||||
|
logger.debug(f"{self._metrics._namespace} metric '{metric.field}' not found in redis data payload {json.dumps(instance_data, indent=2)}")
|
||||||
|
continue
|
||||||
|
if isinstance(metric, HistogramM):
|
||||||
|
buckets = list(zip(metric.buckets, entry['counts']))
|
||||||
|
buckets = [[str(i[0]), str(i[1])] for i in buckets]
|
||||||
|
yield HistogramMetricFamily(metric.field, metric.help_text, buckets=buckets, sum_value=entry['sum'])
|
||||||
|
else:
|
||||||
|
yield GaugeMetricFamily(metric.field, metric.help_text, value=entry)
|
||||||
|
|
||||||
|
|
||||||
|
class CallbackReceiverMetricsServer(MetricsServer):
|
||||||
|
def __init__(self):
|
||||||
|
registry = CollectorRegistry(auto_describe=True)
|
||||||
|
registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False)))
|
||||||
|
super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry)
|
||||||
|
|
||||||
|
|
||||||
|
class DispatcherMetricsServer(MetricsServer):
|
||||||
|
def __init__(self):
|
||||||
|
registry = CollectorRegistry(auto_describe=True)
|
||||||
|
registry.register(CustomToPrometheusMetricsCollector(CallbackReceiverMetrics(metrics_have_changed=False)))
|
||||||
|
super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry)
|
||||||
|
|
||||||
|
|
||||||
|
class WebsocketsMetricsServer(MetricsServer):
|
||||||
|
def __init__(self):
|
||||||
|
registry = CollectorRegistry(auto_describe=True)
|
||||||
|
# registry.register()
|
||||||
|
super().__init__(settings.METRICS_SERVICE_WEBSOCKETS, registry)
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ class RelayConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
if group == "metrics":
|
if group == "metrics":
|
||||||
message = json.loads(message['text'])
|
message = json.loads(message['text'])
|
||||||
conn = redis.Redis.from_url(settings.BROKER_URL)
|
conn = redis.Redis.from_url(settings.BROKER_URL)
|
||||||
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics'])
|
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "-" + message['metrics_namespace'] + "_instance_" + message['instance'], message['metrics'])
|
||||||
else:
|
else:
|
||||||
await self.channel_layer.group_send(group, message)
|
await self.channel_layer.group_send(group, message)
|
||||||
|
|
||||||
|
|||||||
@@ -168,7 +168,7 @@ class AWXConsumerPG(AWXConsumerBase):
|
|||||||
init_time = time.time()
|
init_time = time.time()
|
||||||
self.pg_down_time = init_time - self.pg_max_wait # allow no grace period
|
self.pg_down_time = init_time - self.pg_max_wait # allow no grace period
|
||||||
self.last_cleanup = init_time
|
self.last_cleanup = init_time
|
||||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
|
||||||
self.last_metrics_gather = init_time
|
self.last_metrics_gather = init_time
|
||||||
self.listen_cumulative_time = 0.0
|
self.listen_cumulative_time = 0.0
|
||||||
if schedule:
|
if schedule:
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.buff = {}
|
self.buff = {}
|
||||||
self.redis = redis.Redis.from_url(settings.BROKER_URL)
|
self.redis = redis.Redis.from_url(settings.BROKER_URL)
|
||||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
self.subsystem_metrics = s_metrics.CallbackReceiverMetrics(auto_pipe_execute=False)
|
||||||
self.queue_pop = 0
|
self.queue_pop = 0
|
||||||
self.queue_name = settings.CALLBACK_QUEUE
|
self.queue_name = settings.CALLBACK_QUEUE
|
||||||
self.prof = AWXProfiler("CallbackBrokerWorker")
|
self.prof = AWXProfiler("CallbackBrokerWorker")
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
|
||||||
|
|
||||||
from awx.main.dispatch.control import Control
|
from awx.main.dispatch.control import Control
|
||||||
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
|
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
|
||||||
@@ -25,6 +26,9 @@ class Command(BaseCommand):
|
|||||||
print(Control('callback_receiver').status())
|
print(Control('callback_receiver').status())
|
||||||
return
|
return
|
||||||
consumer = None
|
consumer = None
|
||||||
|
|
||||||
|
CallbackReceiverMetricsServer().start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
consumer = AWXConsumerRedis(
|
consumer = AWXConsumerRedis(
|
||||||
'callback_receiver',
|
'callback_receiver',
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from awx.main.dispatch import get_task_queuename
|
|||||||
from awx.main.dispatch.control import Control
|
from awx.main.dispatch.control import Control
|
||||||
from awx.main.dispatch.pool import AutoscalePool
|
from awx.main.dispatch.pool import AutoscalePool
|
||||||
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
|
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
|
||||||
|
from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.dispatch')
|
logger = logging.getLogger('awx.main.dispatch')
|
||||||
|
|
||||||
@@ -62,6 +63,8 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
consumer = None
|
consumer = None
|
||||||
|
|
||||||
|
DispatcherMetricsServer().start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
||||||
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
|
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from awx.main.analytics.broadcast_websocket import (
|
|||||||
RelayWebsocketStatsManager,
|
RelayWebsocketStatsManager,
|
||||||
safe_name,
|
safe_name,
|
||||||
)
|
)
|
||||||
|
from awx.main.analytics.subsystem_metrics import WebsocketsMetricsServer
|
||||||
from awx.main.wsrelay import WebSocketRelayManager
|
from awx.main.wsrelay import WebSocketRelayManager
|
||||||
|
|
||||||
|
|
||||||
@@ -91,6 +92,8 @@ class Command(BaseCommand):
|
|||||||
return host_stats
|
return host_stats
|
||||||
|
|
||||||
def handle(self, *arg, **options):
|
def handle(self, *arg, **options):
|
||||||
|
WebsocketsMetricsServer().start()
|
||||||
|
|
||||||
# it's necessary to delay this import in case
|
# it's necessary to delay this import in case
|
||||||
# database migrations are still running
|
# database migrations are still running
|
||||||
from awx.main.models.ha import Instance
|
from awx.main.models.ha import Instance
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ class TaskBase:
|
|||||||
# initialize each metric to 0 and force metric_has_changed to true. This
|
# initialize each metric to 0 and force metric_has_changed to true. This
|
||||||
# ensures each task manager metric will be overridden when pipe_execute
|
# ensures each task manager metric will be overridden when pipe_execute
|
||||||
# is called later.
|
# is called later.
|
||||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
|
||||||
self.start_time = time.time()
|
self.start_time = time.time()
|
||||||
|
|
||||||
# We want to avoid calling settings in loops, so cache these settings at init time
|
# We want to avoid calling settings in loops, so cache these settings at init time
|
||||||
@@ -105,7 +105,7 @@ class TaskBase:
|
|||||||
try:
|
try:
|
||||||
# increment task_manager_schedule_calls regardless if the other
|
# increment task_manager_schedule_calls regardless if the other
|
||||||
# metrics are recorded
|
# metrics are recorded
|
||||||
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
|
s_metrics.DispatcherMetrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
|
||||||
# Only record metrics if the last time recording was more
|
# Only record metrics if the last time recording was more
|
||||||
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
|
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
|
||||||
# Prevents a short-duration task manager that runs directly after a
|
# Prevents a short-duration task manager that runs directly after a
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanu
|
|||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main import analytics
|
from awx.main import analytics
|
||||||
from awx.conf import settings_registry
|
from awx.conf import settings_registry
|
||||||
from awx.main.analytics.subsystem_metrics import Metrics
|
from awx.main.analytics.subsystem_metrics import DispatcherMetrics
|
||||||
|
|
||||||
from rest_framework.exceptions import PermissionDenied
|
from rest_framework.exceptions import PermissionDenied
|
||||||
|
|
||||||
@@ -113,7 +113,7 @@ def dispatch_startup():
|
|||||||
cluster_node_heartbeat()
|
cluster_node_heartbeat()
|
||||||
reaper.startup_reaping()
|
reaper.startup_reaping()
|
||||||
reaper.reap_waiting(grace_period=0)
|
reaper.reap_waiting(grace_period=0)
|
||||||
m = Metrics()
|
m = DispatcherMetrics()
|
||||||
m.reset_values()
|
m.reset_values()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ from awx.main.analytics.broadcast_websocket import (
|
|||||||
RelayWebsocketStats,
|
RelayWebsocketStats,
|
||||||
RelayWebsocketStatsManager,
|
RelayWebsocketStatsManager,
|
||||||
)
|
)
|
||||||
import awx.main.analytics.subsystem_metrics as s_metrics
|
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.wsrelay')
|
logger = logging.getLogger('awx.main.wsrelay')
|
||||||
|
|
||||||
@@ -54,7 +53,6 @@ class WebsocketRelayConnection:
|
|||||||
self.protocol = protocol
|
self.protocol = protocol
|
||||||
self.verify_ssl = verify_ssl
|
self.verify_ssl = verify_ssl
|
||||||
self.channel_layer = None
|
self.channel_layer = None
|
||||||
self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
|
|
||||||
self.producers = dict()
|
self.producers = dict()
|
||||||
self.connected = False
|
self.connected = False
|
||||||
|
|
||||||
|
|||||||
@@ -1076,6 +1076,35 @@ HOST_METRIC_SUMMARY_TASK_LAST_TS = None
|
|||||||
HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days
|
HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: cmeyers, replace with with register pattern
|
||||||
|
# The register pattern is particularly nice for this because we need
|
||||||
|
# to know the process to start the thread that will be the server.
|
||||||
|
# The registration location should be the same location as we would
|
||||||
|
# call MetricsServer.start()
|
||||||
|
# Note: if we don't get to this TODO, then at least create constants
|
||||||
|
# for the services strings below.
|
||||||
|
# TODO: cmeyers, break this out into a separate django app so other
|
||||||
|
# projects can take advantage.
|
||||||
|
|
||||||
|
METRICS_SERVICE_CALLBACK_RECEIVER = 'callback_receiver'
|
||||||
|
METRICS_SERVICE_DISPATCHER = 'dispatcher'
|
||||||
|
METRICS_SERVICE_WEBSOCKETS = 'websockets'
|
||||||
|
|
||||||
|
METRICS_SUBSYSTEM_CONFIG = {
|
||||||
|
'server': {
|
||||||
|
METRICS_SERVICE_CALLBACK_RECEIVER: {
|
||||||
|
'port': 8014,
|
||||||
|
},
|
||||||
|
METRICS_SERVICE_DISPATCHER: {
|
||||||
|
'port': 8015,
|
||||||
|
},
|
||||||
|
METRICS_SERVICE_WEBSOCKETS: {
|
||||||
|
'port': 8016,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
# django-ansible-base
|
# django-ansible-base
|
||||||
ANSIBLE_BASE_TEAM_MODEL = 'main.Team'
|
ANSIBLE_BASE_TEAM_MODEL = 'main.Team'
|
||||||
ANSIBLE_BASE_ORGANIZATION_MODEL = 'main.Organization'
|
ANSIBLE_BASE_ORGANIZATION_MODEL = 'main.Organization'
|
||||||
|
|||||||
Reference in New Issue
Block a user