Merge pull request #9461 from fosterseth/feat_metrics_via_redis

Add subsystem metrics that propagate through Redis

SUMMARY

#9019 -- list of metrics and their purpose / description
#9012
#9056
#8629
Use Redis to store metrics pertaining to the performance and health of subsystems such as the callback receiver and task manager. It is thread / multiprocess safe and should be fast enough to handle a high volume of data.
This data shows up at the /api/v2/metrics endpoint
You can filter down nodes using /api/v2/metrics/?subsystemonly=1&node=awx-1
You can also filter down to a specific metric,
/api/v2/metrics/?subsystemonly=1&metrics=callback_receiver_events_insert_db_seconds&node=awx-1

ISSUE TYPE


Feature Pull Request

COMPONENT NAME


API

AWX VERSION

awx: 17.0.1

Reviewed-by: Ryan Petrello <None>
Reviewed-by: Chris Meyers <None>
Reviewed-by: Seth Foster <None>
Reviewed-by: Elijah DeLee <kdelee@redhat.com>
This commit is contained in:
softwarefactory-project-zuul[bot] 2021-03-25 19:54:21 +00:00 committed by GitHub
commit 6087d5cb9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 410 additions and 20 deletions

View File

@ -129,6 +129,18 @@ class PrometheusJSONRenderer(renderers.JSONRenderer):
parsed_metrics = text_string_to_metric_families(data)
data = {}
for family in parsed_metrics:
data[family.name] = {}
data[family.name]['help_text'] = family.documentation
data[family.name]['type'] = family.type
data[family.name]['samples'] = []
for sample in family.samples:
data[sample[0]] = {"labels": sample[1], "value": sample[2]}
sample_dict = {"labels": sample[1], "value": sample[2]}
if family.type == 'histogram':
if sample[0].endswith("_sum"):
sample_dict['sample_type'] = "sum"
elif sample[0].endswith("_count"):
sample_dict['sample_type'] = "count"
elif sample[0].endswith("_bucket"):
sample_dict['sample_type'] = "bucket"
data[family.name]['samples'].append(sample_dict)
return super(PrometheusJSONRenderer, self).render(data, accepted_media_type, renderer_context)

View File

@ -0,0 +1 @@
query params to filter response, e.g., ?subsystemonly=1&metric=callback_receiver_events_insert_db&node=awx-1

View File

@ -14,6 +14,7 @@ from rest_framework.exceptions import PermissionDenied
# AWX
# from awx.main.analytics import collectors
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.analytics.metrics import metrics
from awx.api import renderers
@ -33,5 +34,10 @@ class MetricsView(APIView):
def get(self, request):
''' Show Metrics Details '''
if request.user.is_superuser or request.user.is_system_auditor:
return Response(metrics().decode('UTF-8'))
metrics_to_show = ''
if not request.query_params.get('subsystemonly', "0") == "1":
metrics_to_show += metrics().decode('UTF-8')
if not request.query_params.get('dbonly', "0") == "1":
metrics_to_show += s_metrics.metrics(request)
return Response(metrics_to_show)
raise PermissionDenied()

View File

@ -0,0 +1,14 @@
# Python
import logging
# AWX
from awx.main.analytics.subsystem_metrics import Metrics
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_local_queuename
logger = logging.getLogger('awx.main.scheduler')
@task(queue=get_local_queuename)
def send_subsystem_metrics():
Metrics().send_metrics()

View File

@ -0,0 +1,304 @@
import redis
import json
import time
import logging
from django.conf import settings
from django.apps import apps
from awx.main.consumers import emit_channel_notification
root_key = 'awx_metrics'
logger = logging.getLogger('awx.main.wsbroadcast')
class BaseM:
def __init__(self, field, help_text):
self.field = field
self.help_text = help_text
self.current_value = 0
def clear_value(self, conn):
conn.hset(root_key, self.field, 0)
self.current_value = 0
def inc(self, value):
self.current_value += value
def set(self, value):
self.current_value = value
def decode(self, conn):
value = conn.hget(root_key, self.field)
return self.decode_value(value)
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
return output_text
class FloatM(BaseM):
def decode_value(self, value):
if value is not None:
return float(value)
else:
return 0.0
def store_value(self, conn):
conn.hincrbyfloat(root_key, self.field, self.current_value)
self.current_value = 0
class IntM(BaseM):
def decode_value(self, value):
if value is not None:
return int(value)
else:
return 0
def store_value(self, conn):
conn.hincrby(root_key, self.field, self.current_value)
self.current_value = 0
class SetIntM(BaseM):
def decode_value(self, value):
if value is not None:
return int(value)
else:
return 0
def store_value(self, conn):
# do not set value if it has not changed since last time this was called
if self.current_value is not None:
conn.hset(root_key, self.field, self.current_value)
self.current_value = None
class SetFloatM(SetIntM):
def decode_value(self, value):
if value is not None:
return float(value)
else:
return 0
class HistogramM(BaseM):
def __init__(self, field, help_text, buckets):
self.buckets = buckets
self.buckets_to_keys = {}
for b in buckets:
self.buckets_to_keys[b] = IntM(field + '_' + str(b), '')
self.inf = IntM(field + '_inf', '')
self.sum = IntM(field + '_sum', '')
super(HistogramM, self).__init__(field, help_text)
def clear_value(self, conn):
conn.hset(root_key, self.field, 0)
self.inf.clear_value(conn)
self.sum.clear_value(conn)
for b in self.buckets_to_keys.values():
b.clear_value(conn)
super(HistogramM, self).clear_value(conn)
def observe(self, value):
for b in self.buckets:
if value <= b:
self.buckets_to_keys[b].inc(1)
break
self.sum.inc(value)
self.inf.inc(1)
def decode(self, conn):
values = {'counts': []}
for b in self.buckets_to_keys:
values['counts'].append(self.buckets_to_keys[b].decode(conn))
values['sum'] = self.sum.decode(conn)
values['inf'] = self.inf.decode(conn)
return values
def store_value(self, conn):
for b in self.buckets:
self.buckets_to_keys[b].store_value(conn)
self.sum.store_value(conn)
self.inf.store_value(conn)
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} histogram\n"
for instance in instance_data:
for i, b in enumerate(self.buckets):
output_text += f'{self.field}_bucket{{le="{b}",node="{instance}"}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n'
output_text += f'{self.field}_bucket{{le="+Inf",node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_count{{node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_sum{{node="{instance}"}} {instance_data[instance][self.field]["sum"]}\n'
return output_text
class Metrics:
def __init__(self, auto_pipe_execute=True):
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
self.conn = redis.Redis.from_url(settings.BROKER_URL)
self.last_pipe_execute = time.time()
# track if metrics have been modified since last saved to redis
# start with True so that we get an initial save to redis
self.metrics_have_changed = True
self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS
self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS
# auto pipe execute will commit transaction of metric data to redis
# at a regular interval (pipe_execute_interval). If set to False,
# the calling function should call .pipe_execute() explicitly
self.auto_pipe_execute = auto_pipe_execute
Instance = apps.get_model('main', 'Instance')
self.instance_name = Instance.objects.me().hostname
# metric name, help_text
METRICSLIST = [
SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'),
IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'),
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
HistogramM(
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
),
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
for m in METRICSLIST:
self.METRICS[m.field] = m
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')
def clear_values(self):
for m in self.METRICS.values():
m.clear_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")
def inc(self, field, value):
if value != 0:
self.METRICS[field].inc(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
self.pipe_execute()
def set(self, field, value):
self.METRICS[field].set(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
self.pipe_execute()
def observe(self, field, value):
self.METRICS[field].observe(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
self.pipe_execute()
def serialize_local_metrics(self):
data = self.load_local_metrics()
return json.dumps(data)
def load_local_metrics(self):
# generate python dictionary of key values from metrics stored in redis
data = {}
for field in self.METRICS:
data[field] = self.METRICS[field].decode(self.conn)
return data
def store_metrics(self, data_json):
# called when receiving metrics from other instances
data = json.loads(data_json)
if self.instance_name != data['instance']:
logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}")
self.conn.set(root_key + "_instance_" + data['instance'], data['metrics'])
def should_pipe_execute(self):
if self.metrics_have_changed is False:
return False
if time.time() - self.last_pipe_execute > self.pipe_execute_interval:
return True
else:
return False
def pipe_execute(self):
if self.metrics_have_changed is True:
duration_to_save = time.perf_counter()
for m in self.METRICS:
self.METRICS[m].store_value(self.pipe)
self.pipe.execute()
self.last_pipe_execute = time.time()
self.metrics_have_changed = False
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_to_save)
self.METRICS['subsystem_metrics_pipe_execute_calls'].inc(1)
duration_to_save = time.perf_counter()
self.send_metrics()
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_to_save)
def send_metrics(self):
# more than one thread could be calling this at the same time, so should
# get acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '_lock', thread_local=False)
if not lock.acquire(blocking=False):
return
try:
current_time = time.time()
if current_time - self.previous_send_metrics.decode(self.conn) > self.send_metrics_interval:
payload = {
'instance': self.instance_name,
'metrics': self.serialize_local_metrics(),
}
# store a local copy as well
self.store_metrics(json.dumps(payload))
emit_channel_notification("metrics", payload)
self.previous_send_metrics.set(current_time)
self.previous_send_metrics.store_value(self.conn)
finally:
lock.release()
def load_other_metrics(self, request):
# data received from other nodes are stored in their own keys
# e.g., awx_metrics_instance_awx-1, awx_metrics_instance_awx-2
# this method looks for keys with "_instance_" in the name and loads the data
# also filters data based on request query params
# if additional filtering is added, update metrics_view.md
instances_filter = request.query_params.getlist("node")
# get a sorted list of instance names
instance_names = [self.instance_name]
for m in self.conn.scan_iter(root_key + '_instance_*'):
instance_names.append(m.decode('UTF-8').split('_instance_')[1])
instance_names.sort()
# load data, including data from the this local instance
instance_data = {}
for instance in instance_names:
if len(instances_filter) == 0 or instance in instances_filter:
instance_data_from_redis = self.conn.get(root_key + '_instance_' + instance)
# data from other instances may not be available. That is OK.
if instance_data_from_redis:
instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8'))
return instance_data
def generate_metrics(self, request):
# takes the api request, filters, and generates prometheus data
# if additional filtering is added, update metrics_view.md
instance_data = self.load_other_metrics(request)
metrics_filter = request.query_params.getlist("metric")
output_text = ''
if instance_data:
for field in self.METRICS:
if len(metrics_filter) == 0 or field in metrics_filter:
output_text += self.METRICS[field].to_prometheus(instance_data)
return output_text
def metrics(request):
m = Metrics()
return m.generate_metrics(request)

View File

@ -13,7 +13,6 @@ from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.layers import get_channel_layer
from channels.db import database_sync_to_async
logger = logging.getLogger('awx.main.consumers')
XRF_KEY = '_auth_user_xrf'

View File

@ -20,7 +20,7 @@ from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, Inv
from awx.main.tasks import handle_success_and_failure_notifications
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
import awx.main.analytics.subsystem_metrics as s_metrics
from .base import BaseWorker
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@ -46,16 +46,22 @@ class CallbackBrokerWorker(BaseWorker):
self.buff = {}
self.pid = os.getpid()
self.redis = redis.Redis.from_url(settings.BROKER_URL)
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.queue_pop = 0
self.queue_name = settings.CALLBACK_QUEUE
self.prof = AWXProfiler("CallbackBrokerWorker")
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
self.redis.delete(key)
def read(self, queue):
try:
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=1)
res = self.redis.blpop(self.queue_name, timeout=1)
if res is None:
return {'event': 'FLUSH'}
self.total += 1
self.queue_pop += 1
self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1)
return json.loads(res[1])
except redis.exceptions.RedisError:
logger.exception("encountered an error communicating with redis")
@ -64,8 +70,19 @@ class CallbackBrokerWorker(BaseWorker):
logger.exception("failed to decode JSON message from redis")
finally:
self.record_statistics()
self.record_read_metrics()
return {'event': 'FLUSH'}
def record_read_metrics(self):
if self.queue_pop == 0:
return
if self.subsystem_metrics.should_pipe_execute() is True:
queue_size = self.redis.llen(self.queue_name)
self.subsystem_metrics.set('callback_receiver_events_queue_size_redis', queue_size)
self.subsystem_metrics.pipe_execute()
self.queue_pop = 0
def record_statistics(self):
# buffer stat recording to once per (by default) 5s
if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL:
@ -99,27 +116,44 @@ class CallbackBrokerWorker(BaseWorker):
def flush(self, force=False):
now = tz_now()
if force or (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]):
bulk_events_saved = 0
singular_events_saved = 0
metrics_events_batch_save_errors = 0
for cls, events in self.buff.items():
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
for e in events:
if not e.created:
e.created = now
e.modified = now
duration_to_save = time.perf_counter()
try:
cls.objects.bulk_create(events)
bulk_events_saved += len(events)
except Exception:
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
# broken/stale
metrics_events_batch_save_errors += 1
for e in events:
try:
e.save()
singular_events_saved += 1
except Exception:
logger.exception('Database Error Saving Job Event')
duration_to_save = time.perf_counter() - duration_to_save
for e in events:
emit_event_detail(e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
if (bulk_events_saved + singular_events_saved) > 0:
self.subsystem_metrics.inc('callback_receiver_batch_events_errors', metrics_events_batch_save_errors)
self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', duration_to_save)
self.subsystem_metrics.inc('callback_receiver_events_insert_db', bulk_events_saved + singular_events_saved)
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', bulk_events_saved)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(bulk_events_saved + singular_events_saved))
if self.subsystem_metrics.should_pipe_execute() is True:
self.subsystem_metrics.pipe_execute()
def perform_work(self, body):
try:
@ -169,6 +203,7 @@ class CallbackBrokerWorker(BaseWorker):
except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
finally:
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1)
GuidMiddleware.set_guid('')
return

View File

@ -8,7 +8,7 @@ import redis
# Django
from django.conf import settings
import awx.main.analytics.subsystem_metrics as s_metrics
__all__ = ['CallbackQueueDispatcher']
@ -28,6 +28,7 @@ class CallbackQueueDispatcher(object):
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self.connection = redis.Redis.from_url(settings.BROKER_URL)
self.subsystem_metrics = s_metrics.Metrics()
def dispatch(self, obj):
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))

View File

@ -107,6 +107,7 @@ from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
from awx.conf.license import get_license
from awx.main.analytics.subsystem_metrics import Metrics
from rest_framework.exceptions import PermissionDenied
@ -170,6 +171,7 @@ def dispatch_startup():
cluster_node_heartbeat()
if Instance.objects.me().is_controller():
awx_isolated_heartbeat()
Metrics().clear_values()
# Update Tower's rsyslog.conf file based on loggins settings in the db
reconfigure_rsyslog()

View File

@ -56,24 +56,28 @@ def test_metrics_counts(organization_factory, job_template_factory, workflow_job
assert EXPECTED_VALUES[name] == value
def get_metrics_view_db_only():
return reverse('api:metrics_view') + '?dbonly=1'
@pytest.mark.django_db
def test_metrics_permissions(get, admin, org_admin, alice, bob, organization):
assert get(reverse('api:metrics_view'), user=admin).status_code == 200
assert get(reverse('api:metrics_view'), user=org_admin).status_code == 403
assert get(reverse('api:metrics_view'), user=alice).status_code == 403
assert get(reverse('api:metrics_view'), user=bob).status_code == 403
assert get(get_metrics_view_db_only(), user=admin).status_code == 200
assert get(get_metrics_view_db_only(), user=org_admin).status_code == 403
assert get(get_metrics_view_db_only(), user=alice).status_code == 403
assert get(get_metrics_view_db_only(), user=bob).status_code == 403
organization.auditor_role.members.add(bob)
assert get(reverse('api:metrics_view'), user=bob).status_code == 403
assert get(get_metrics_view_db_only(), user=bob).status_code == 403
Role.singleton('system_auditor').members.add(bob)
bob.is_system_auditor = True
assert get(reverse('api:metrics_view'), user=bob).status_code == 200
assert get(get_metrics_view_db_only(), user=bob).status_code == 200
@pytest.mark.django_db
def test_metrics_http_methods(get, post, patch, put, options, admin):
assert get(reverse('api:metrics_view'), user=admin).status_code == 200
assert put(reverse('api:metrics_view'), user=admin).status_code == 405
assert patch(reverse('api:metrics_view'), user=admin).status_code == 405
assert post(reverse('api:metrics_view'), user=admin).status_code == 405
assert options(reverse('api:metrics_view'), user=admin).status_code == 200
assert get(get_metrics_view_db_only(), user=admin).status_code == 200
assert put(get_metrics_view_db_only(), user=admin).status_code == 405
assert patch(get_metrics_view_db_only(), user=admin).status_code == 405
assert post(get_metrics_view_db_only(), user=admin).status_code == 405
assert options(get_metrics_view_db_only(), user=admin).status_code == 200

View File

@ -15,7 +15,7 @@ from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStats,
BroadcastWebsocketStatsManager,
)
import awx.main.analytics.subsystem_metrics as s_metrics
logger = logging.getLogger('awx.main.wsbroadcast')
@ -68,6 +68,7 @@ class WebsocketTask:
self.protocol = protocol
self.verify_ssl = verify_ssl
self.channel_layer = None
self.subsystem_metrics = s_metrics.Metrics()
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
raise RuntimeError("Implement me")
@ -144,9 +145,10 @@ class BroadcastWebsocketTask(WebsocketTask):
logmsg = "{} {}".format(logmsg, payload)
logger.warn(logmsg)
continue
(group, message) = unwrap_broadcast_msg(payload)
if group == "metrics":
self.subsystem_metrics.store_metrics(message)
continue
await self.channel_layer.group_send(group, {"type": "internal.message", "text": message})

View File

@ -224,6 +224,15 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000
# The number of job events to migrate per-transaction when moving from int -> bigint
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000
# Histogram buckets for the callback_receiver_batch_events_insert_db metric
SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]
# Interval in seconds for sending local metrics to other nodes
SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS = 3
# Interval in seconds for saving local metrics to redis
SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS = 2
# The maximum allowed jobs to start on a given task manager cycle
START_TASK_LIMIT = 100
@ -427,6 +436,7 @@ CELERYBEAT_SCHEDULE = {
'gather_analytics': {'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5)},
'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},
# 'isolated_heartbeat': set up at the end of production.py and development.py
}