Add subsystem metrics for task manager

This commit is contained in:
Seth Foster
2022-05-13 17:08:46 -04:00
parent 705f86f8cf
commit 2f82b75748
6 changed files with 378 additions and 34 deletions

View File

@@ -6,6 +6,9 @@ from datetime import timedelta
import logging
import uuid
import json
import time
import sys
import signal
# Django
from django.db import transaction, connection
@@ -38,12 +41,24 @@ from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field
logger = logging.getLogger('awx.main.scheduler')
def timeit(func):
def inner(*args, **kwargs):
t_now = time.perf_counter()
result = func(*args, **kwargs)
dur = time.perf_counter() - t_now
args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur)
return result
return inner
class TaskManager:
def __init__(self):
"""
@@ -62,6 +77,13 @@ class TaskManager:
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
for m in self.subsystem_metrics.METRICS:
if m.startswith("task_manager"):
self.subsystem_metrics.set(m, 0)
def after_lock_init(self, all_sorted_tasks):
"""
@@ -100,6 +122,7 @@ class TaskManager:
return None
@timeit
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
inventory_updates_qs = (
@@ -125,6 +148,7 @@ class TaskManager:
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
@@ -231,7 +255,9 @@ class TaskManager:
schedule_task_manager()
return result
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.subsystem_metrics.inc("task_manager_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
@@ -291,6 +317,7 @@ class TaskManager:
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)
@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
self.dependency_graph.add_job(task)
@@ -439,6 +466,7 @@ class TaskManager:
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies
@timeit
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
@@ -453,6 +481,7 @@ class TaskManager:
return created_dependencies
@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
@@ -461,6 +490,7 @@ class TaskManager:
break
blocked_by = self.job_blocked_by(task)
if blocked_by:
self.subsystem_metrics.inc("task_manager_tasks_blocked", 1)
task.log_lifecycle("blocked", blocked_by=blocked_by)
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
if task.job_explanation != job_explanation:
@@ -602,17 +632,22 @@ class TaskManager:
def process_tasks(self, all_sorted_tasks):
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]
self.process_running_tasks(running_tasks)
self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks))
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies)
self.process_pending_tasks(pending_tasks)
self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies))
self.process_pending_tasks(pending_tasks)
self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks))
@timeit
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
@@ -648,6 +683,28 @@ class TaskManager:
self.process_tasks(all_sorted_tasks)
return finished_wfjs
def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
# long task manager to override useful metrics.
current_time = time.time()
time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp")
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago")
self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time)
self.subsystem_metrics.pipe_execute()
else:
logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago")
def record_aggregate_metrics_and_exit(self, *args):
self.record_aggregate_metrics()
sys.exit(1)
def schedule(self):
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
@@ -657,5 +714,8 @@ class TaskManager:
return
logger.debug("Starting Scheduler")
with task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
self.record_aggregate_metrics()
logger.debug("Finishing Scheduler")