Add logs to debug waiting bottlenecking

This commit is contained in:
Alan Rominger
2022-07-13 09:39:58 -04:00
parent cfc1255812
commit a3fef27002
6 changed files with 49 additions and 9 deletions

View File

@@ -22,7 +22,7 @@ import psutil
from awx.main.models import UnifiedJob from awx.main.models import UnifiedJob
from awx.main.dispatch import reaper from awx.main.dispatch import reaper
from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime
if 'run_callback_receiver' in sys.argv: if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver') logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@@ -364,6 +364,7 @@ class AutoscalePool(WorkerPool):
def debug_meta(self): def debug_meta(self):
return 'min={} max={}'.format(self.min_workers, self.max_workers) return 'min={} max={}'.format(self.min_workers, self.max_workers)
@log_excess_runtime(logger)
def cleanup(self): def cleanup(self):
""" """
Perform some internal account and cleanup. This is run on Perform some internal account and cleanup. This is run on
@@ -380,6 +381,7 @@ class AutoscalePool(WorkerPool):
if there's an outage, this method _can_ throw various if there's an outage, this method _can_ throw various
django.db.utils.Error exceptions. Act accordingly. django.db.utils.Error exceptions. Act accordingly.
""" """
start_time = time.time()
orphaned = [] orphaned = []
for w in self.workers[::]: for w in self.workers[::]:
if not w.alive: if not w.alive:
@@ -432,16 +434,16 @@ class AutoscalePool(WorkerPool):
idx = random.choice(range(len(self.workers))) idx = random.choice(range(len(self.workers)))
self.write(idx, m) self.write(idx, m)
# if we are not in the dangerous situation of queue backup then clear old waiting jobs # if the database says a job is running or queued on this node, but it's *not*,
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting()
# if the database says a job is running on this node, but it's *not*,
# then reap it # then reap it
running_uuids = [] running_uuids = []
for worker in self.workers: for worker in self.workers:
worker.calculate_managed_tasks() worker.calculate_managed_tasks()
running_uuids.extend(list(worker.managed_tasks.keys())) running_uuids.extend(list(worker.managed_tasks.keys()))
delta = time.time() - start_time
if delta > 1.0:
logger.warning(f'Took {delta} for internal part of cleanup')
start_time = time.time()
reaper.reap(excluded_uuids=running_uuids) reaper.reap(excluded_uuids=running_uuids)
def up(self): def up(self):
@@ -471,6 +473,10 @@ class AutoscalePool(WorkerPool):
w.put(body) w.put(body)
break break
else: else:
task_name = 'unknown'
if isinstance(body, dict):
task_name = body.get('task')
logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}')
return super(AutoscalePool, self).write(preferred_queue, body) return super(AutoscalePool, self).write(preferred_queue, body)
except Exception: except Exception:
for conn in connections.all(): for conn in connections.all():

View File

@@ -2,6 +2,7 @@ import inspect
import logging import logging
import sys import sys
import json import json
import time
from uuid import uuid4 from uuid import uuid4
from django.conf import settings from django.conf import settings
@@ -75,7 +76,7 @@ class task:
msg = f'{cls.name}: Queue value required and may not be None' msg = f'{cls.name}: Queue value required and may not be None'
logger.error(msg) logger.error(msg)
raise ValueError(msg) raise ValueError(msg)
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
guid = get_guid() guid = get_guid()
if guid: if guid:
obj['guid'] = guid obj['guid'] = guid

View File

@@ -17,6 +17,7 @@ from django.conf import settings
from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch.pool import WorkerPool
from awx.main.dispatch import pg_bus_conn from awx.main.dispatch import pg_bus_conn
from awx.main.utils.common import log_excess_runtime
if 'run_callback_receiver' in sys.argv: if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver') logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@@ -81,6 +82,9 @@ class AWXConsumerBase(object):
logger.error('unrecognized control message: {}'.format(control)) logger.error('unrecognized control message: {}'.format(control))
def process_task(self, body): def process_task(self, body):
if isinstance(body, dict):
body['time_ack'] = time.time()
if 'control' in body: if 'control' in body:
try: try:
return self.control(body) return self.control(body)
@@ -101,6 +105,7 @@ class AWXConsumerBase(object):
self.total_messages += 1 self.total_messages += 1
self.record_statistics() self.record_statistics()
@log_excess_runtime(logger)
def record_statistics(self): def record_statistics(self):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second if time.time() - self.last_stats > 1: # buffer stat recording to once per second
try: try:

View File

@@ -183,7 +183,6 @@ class CallbackBrokerWorker(BaseWorker):
except Exception as exc_indv: except Exception as exc_indv:
consecutive_errors += 1 consecutive_errors += 1
logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}')
if consecutive_errors >= 5: if consecutive_errors >= 5:
raise raise
metrics_singular_events_saved += events_saved metrics_singular_events_saved += events_saved

View File

@@ -3,6 +3,7 @@ import logging
import importlib import importlib
import sys import sys
import traceback import traceback
import time
from kubernetes.config import kube_config from kubernetes.config import kube_config
@@ -60,8 +61,19 @@ class TaskWorker(BaseWorker):
# the callable is a class, e.g., RunJob; instantiate and # the callable is a class, e.g., RunJob; instantiate and
# return its `run()` method # return its `run()` method
_call = _call().run _call = _call().run
log_extra = ''
logger_method = logger.debug
if ('time_ack' in body) and ('time_pub' in body):
time_publish = body['time_ack'] - body['time_pub']
time_waiting = time.time() - body['time_ack']
if time_waiting > 5.0 or time_publish > 5.0:
# If task too a very long time to process, add this information to the log
log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher'
logger_method = logger.info
# don't print kwargs, they often contain launch-time secrets # don't print kwargs, they often contain launch-time secrets
logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) logger_method(f'task {uuid} starting {task}(*{args}){log_extra}')
return _call(*args, **kwargs) return _call(*args, **kwargs)
def perform_work(self, body): def perform_work(self, body):

View File

@@ -6,6 +6,7 @@ from datetime import timedelta
import json import json
import yaml import yaml
import logging import logging
import time
import os import os
import subprocess import subprocess
import re import re
@@ -1183,3 +1184,19 @@ def cleanup_new_process(func):
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper_cleanup_new_process return wrapper_cleanup_new_process
def log_excess_runtime(func_logger, cutoff=5.0):
def log_excess_runtime_decorator(func):
@wraps(func)
def _new_func(*args, **kwargs):
start_time = time.time()
return_value = func(*args, **kwargs)
delta = time.time() - start_time
if delta > cutoff:
logger.info(f'Running {func.__name__!r} took {delta:.2f}s')
return return_value
return _new_func
return log_excess_runtime_decorator