From 2f82b757483cf67829a8c0ed843b51d126ec658e Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Fri, 13 May 2022 17:08:46 -0400 Subject: [PATCH] Add subsystem metrics for task manager --- awx/main/analytics/subsystem_metrics.py | 84 ++++-- awx/main/queue.py | 2 - awx/main/scheduler/task_manager.py | 64 ++++- awx/main/tasks/system.py | 3 +- awx/settings/defaults.py | 4 + tools/grafana/dashboards/demo_dashboard.json | 255 ++++++++++++++++++- 6 files changed, 378 insertions(+), 34 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index c1ad08a0cb..2d6520c33c 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -8,7 +8,7 @@ from django.apps import apps from awx.main.consumers import emit_channel_notification root_key = 'awx_metrics' -logger = logging.getLogger('awx.main.wsbroadcast') +logger = logging.getLogger('awx.main.analytics') class BaseM: @@ -16,16 +16,22 @@ class BaseM: self.field = field self.help_text = help_text self.current_value = 0 + self.metric_has_changed = False - def clear_value(self, conn): + def reset_value(self, conn): conn.hset(root_key, self.field, 0) self.current_value = 0 def inc(self, value): self.current_value += value + self.metric_has_changed = True def set(self, value): self.current_value = value + self.metric_has_changed = True + + def get(self): + return self.current_value def decode(self, conn): value = conn.hget(root_key, self.field) @@ -34,7 +40,7 @@ class BaseM: def to_prometheus(self, instance_data): output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n" for instance in instance_data: - output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance].get(self.field, -1)}\n' # TODO: fix because this -1 is neccessary when dealing with old instances (ex. you didn't clean up your database) + output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n' return output_text @@ -46,8 +52,10 @@ class FloatM(BaseM): return 0.0 def store_value(self, conn): - conn.hincrbyfloat(root_key, self.field, self.current_value) - self.current_value = 0 + if self.metric_has_changed: + conn.hincrbyfloat(root_key, self.field, self.current_value) + self.current_value = 0 + self.metric_has_changed = False class IntM(BaseM): @@ -58,8 +66,10 @@ class IntM(BaseM): return 0 def store_value(self, conn): - conn.hincrby(root_key, self.field, self.current_value) - self.current_value = 0 + if self.metric_has_changed: + conn.hincrby(root_key, self.field, self.current_value) + self.current_value = 0 + self.metric_has_changed = False class SetIntM(BaseM): @@ -70,10 +80,9 @@ class SetIntM(BaseM): return 0 def store_value(self, conn): - # do not set value if it has not changed since last time this was called - if self.current_value is not None: + if self.metric_has_changed: conn.hset(root_key, self.field, self.current_value) - self.current_value = None + self.metric_has_changed = False class SetFloatM(SetIntM): @@ -94,13 +103,13 @@ class HistogramM(BaseM): self.sum = IntM(field + '_sum', '') super(HistogramM, self).__init__(field, help_text) - def clear_value(self, conn): + def reset_value(self, conn): conn.hset(root_key, self.field, 0) - self.inf.clear_value(conn) - self.sum.clear_value(conn) + self.inf.reset_value(conn) + self.sum.reset_value(conn) for b in self.buckets_to_keys.values(): - b.clear_value(conn) - super(HistogramM, self).clear_value(conn) + b.reset_value(conn) + super(HistogramM, self).reset_value(conn) def observe(self, value): for b in self.buckets: @@ -136,7 +145,7 @@ class HistogramM(BaseM): class Metrics: - def __init__(self, auto_pipe_execute=True, instance_name=None): + def __init__(self, auto_pipe_execute=False, instance_name=None): self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline() self.conn = redis.Redis.from_url(settings.BROKER_URL) self.last_pipe_execute = time.time() @@ -152,6 +161,8 @@ class Metrics: Instance = apps.get_model('main', 'Instance') if instance_name: self.instance_name = instance_name + elif settings.IS_TESTING(): + self.instance_name = "awx_testing" else: self.instance_name = Instance.objects.me().hostname @@ -167,10 +178,23 @@ class Metrics: HistogramM( 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS ), + SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'), FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), - SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'), + SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), + SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), + SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), + SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), + SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), + SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('task_manager_tasks_started', 'Number of tasks started'), + SetIntM('task_manager_running_processed', 'Number of running tasks processed'), + SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), + SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} @@ -180,11 +204,11 @@ class Metrics: # track last time metrics were sent to other nodes self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call') - def clear_values(self, fields=None): - if not fields: - fields = self.METRICS.keys() - for m in fields: - self.METRICS[m].clear_value(self.conn) + def reset_values(self): + # intended to be called once on app startup to reset all metric + # values to 0 + for m in self.METRICS.values(): + m.reset_value(self.conn) self.metrics_have_changed = True self.conn.delete(root_key + "_lock") @@ -192,19 +216,25 @@ class Metrics: if value != 0: self.METRICS[field].inc(value) self.metrics_have_changed = True - if self.auto_pipe_execute is True and self.should_pipe_execute() is True: + if self.auto_pipe_execute is True: self.pipe_execute() def set(self, field, value): self.METRICS[field].set(value) self.metrics_have_changed = True - if self.auto_pipe_execute is True and self.should_pipe_execute() is True: + if self.auto_pipe_execute is True: self.pipe_execute() + def get(self, field): + return self.METRICS[field].get() + + def decode(self, field): + return self.METRICS[field].decode(self.conn) + def observe(self, field, value): self.METRICS[field].observe(value) self.metrics_have_changed = True - if self.auto_pipe_execute is True and self.should_pipe_execute() is True: + if self.auto_pipe_execute is True: self.pipe_execute() def serialize_local_metrics(self): @@ -252,8 +282,8 @@ class Metrics: def send_metrics(self): # more than one thread could be calling this at the same time, so should - # get acquire redis lock before sending metrics - lock = self.conn.lock(root_key + '_lock', thread_local=False) + # acquire redis lock before sending metrics + lock = self.conn.lock(root_key + '_lock') if not lock.acquire(blocking=False): return try: diff --git a/awx/main/queue.py b/awx/main/queue.py index ebac0622e4..26d23a5cbb 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -8,7 +8,6 @@ import redis # Django from django.conf import settings -import awx.main.analytics.subsystem_metrics as s_metrics __all__ = ['CallbackQueueDispatcher'] @@ -28,7 +27,6 @@ class CallbackQueueDispatcher(object): self.queue = getattr(settings, 'CALLBACK_QUEUE', '') self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') self.connection = redis.Redis.from_url(settings.BROKER_URL) - self.subsystem_metrics = s_metrics.Metrics() def dispatch(self, obj): self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder)) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 6fa200fcd6..5ca72763d0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -6,6 +6,9 @@ from datetime import timedelta import logging import uuid import json +import time +import sys +import signal # Django from django.db import transaction, connection @@ -38,12 +41,24 @@ from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.task_manager_models import TaskManagerInstances from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups +import awx.main.analytics.subsystem_metrics as s_metrics from awx.main.utils import decrypt_field logger = logging.getLogger('awx.main.scheduler') +def timeit(func): + def inner(*args, **kwargs): + t_now = time.perf_counter() + result = func(*args, **kwargs) + dur = time.perf_counter() - t_now + args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur) + return result + + return inner + + class TaskManager: def __init__(self): """ @@ -62,6 +77,13 @@ class TaskManager: # will no longer be started and will be started on the next task manager cycle. self.start_task_limit = settings.START_TASK_LIMIT self.time_delta_job_explanation = timedelta(seconds=30) + self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + # initialize each metric to 0 and force metric_has_changed to true. This + # ensures each task manager metric will be overridden when pipe_execute + # is called later. + for m in self.subsystem_metrics.METRICS: + if m.startswith("task_manager"): + self.subsystem_metrics.set(m, 0) def after_lock_init(self, all_sorted_tasks): """ @@ -100,6 +122,7 @@ class TaskManager: return None + @timeit def get_tasks(self, status_list=('pending', 'waiting', 'running')): jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')] inventory_updates_qs = ( @@ -125,6 +148,7 @@ class TaskManager: inventory_ids.add(task.inventory_id) return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): for workflow_job in workflow_jobs: if workflow_job.cancel_flag: @@ -231,7 +255,9 @@ class TaskManager: schedule_task_manager() return result + @timeit def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.subsystem_metrics.inc("task_manager_tasks_started", 1) self.start_task_limit -= 1 if self.start_task_limit == 0: # schedule another run immediately after this task manager @@ -291,6 +317,7 @@ class TaskManager: task.websocket_emit_status(task.status) # adds to on_commit connection.on_commit(post_commit) + @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: self.dependency_graph.add_job(task) @@ -439,6 +466,7 @@ class TaskManager: latest_src_project_update.scm_inventory_updates.add(inventory_task) return created_dependencies + @timeit def generate_dependencies(self, undeped_tasks): created_dependencies = [] for task in undeped_tasks: @@ -453,6 +481,7 @@ class TaskManager: return created_dependencies + @timeit def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] @@ -461,6 +490,7 @@ class TaskManager: break blocked_by = self.job_blocked_by(task) if blocked_by: + self.subsystem_metrics.inc("task_manager_tasks_blocked", 1) task.log_lifecycle("blocked", blocked_by=blocked_by) job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish") if task.job_explanation != job_explanation: @@ -602,17 +632,22 @@ class TaskManager: def process_tasks(self, all_sorted_tasks): running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] - self.process_running_tasks(running_tasks) + self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks)) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] + undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] dependencies = self.generate_dependencies(undeped_tasks) deps_of_deps = self.generate_dependencies(dependencies) dependencies += deps_of_deps self.process_pending_tasks(dependencies) - self.process_pending_tasks(pending_tasks) + self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies)) + self.process_pending_tasks(pending_tasks) + self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks)) + + @timeit def _schedule(self): finished_wfjs = [] all_sorted_tasks = self.get_tasks() @@ -648,6 +683,28 @@ class TaskManager: self.process_tasks(all_sorted_tasks) return finished_wfjs + def record_aggregate_metrics(self, *args): + if not settings.IS_TESTING(): + # increment task_manager_schedule_calls regardless if the other + # metrics are recorded + s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1) + # Only record metrics if the last time recording was more + # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. + # Prevents a short-duration task manager that runs directly after a + # long task manager to override useful metrics. + current_time = time.time() + time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp") + if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: + logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago") + self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time) + self.subsystem_metrics.pipe_execute() + else: + logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago") + + def record_aggregate_metrics_and_exit(self, *args): + self.record_aggregate_metrics() + sys.exit(1) + def schedule(self): # Lock with advisory_lock('task_manager_lock', wait=False) as acquired: @@ -657,5 +714,8 @@ class TaskManager: return logger.debug("Starting Scheduler") with task_manager_bulk_reschedule(): + # if sigterm due to timeout, still record metrics + signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) self._schedule() + self.record_aggregate_metrics() logger.debug("Finishing Scheduler") diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 8c698609a5..541415f2b8 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -103,7 +103,8 @@ def dispatch_startup(): # apply_cluster_membership_policies() cluster_node_heartbeat() - Metrics().clear_values() + m = Metrics() + m.reset_values() # Update Tower's rsyslog.conf file based on loggins settings in the db reconfigure_rsyslog() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 24b4ca79ff..ef389e5151 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -241,6 +241,10 @@ SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS = 3 # Interval in seconds for saving local metrics to redis SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS = 2 +# Record task manager metrics at the following interval in seconds +# If using Prometheus, it is recommended to be => the Prometheus scrape interval +SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL = 15 + # The maximum allowed jobs to start on a given task manager cycle START_TASK_LIMIT = 100 diff --git a/tools/grafana/dashboards/demo_dashboard.json b/tools/grafana/dashboards/demo_dashboard.json index 0294fa4168..7d604a10f2 100644 --- a/tools/grafana/dashboards/demo_dashboard.json +++ b/tools/grafana/dashboards/demo_dashboard.json @@ -89,6 +89,256 @@ "x": 0, "y": 0 }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "task_manager_running_processed", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "task_manager_pending_processed", + "hide": false, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "task_manager_tasks_blocked", + "hide": false, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "task_manager_tasks_started", + "hide": false, + "refId": "C" + } + ], + "title": "Task manager workload", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "task_manager_process_pending_tasks_seconds", + "legendFormat": "__auto", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "task_manager_process_running_tasks_seconds", + "hide": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "task_manager_generate_dependencies_seconds", + "hide": false, + "legendFormat": "__auto", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "task_manager_get_tasks_seconds", + "hide": false, + "legendFormat": "__auto", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "task_manager_spawn_workflow_graph_jobs_seconds", + "hide": false, + "legendFormat": "__auto", + "range": true, + "refId": "E" + } + ], + "title": "Task manager timings", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, "id": 8, "options": { "legend": { @@ -115,6 +365,7 @@ "type": "timeseries" } ], + "refresh": "5s", "schemaVersion": 36, "style": "dark", "tags": [], @@ -122,13 +373,13 @@ "list": [] }, "time": { - "from": "now-30m", + "from": "now-5m", "to": "now" }, "timepicker": {}, "timezone": "", "title": "awx-demo", "uid": "GISWZOXnk", - "version": 6, + "version": 2, "weekStart": "" }