diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index d9a3f36324..f700656ea4 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,7 +22,7 @@ import psutil from awx.main.models import UnifiedJob from awx.main.dispatch import reaper -from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity +from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -364,6 +364,7 @@ class AutoscalePool(WorkerPool): def debug_meta(self): return 'min={} max={}'.format(self.min_workers, self.max_workers) + @log_excess_runtime(logger) def cleanup(self): """ Perform some internal account and cleanup. This is run on @@ -380,6 +381,7 @@ class AutoscalePool(WorkerPool): if there's an outage, this method _can_ throw various django.db.utils.Error exceptions. Act accordingly. """ + start_time = time.time() orphaned = [] for w in self.workers[::]: if not w.alive: @@ -432,16 +434,16 @@ class AutoscalePool(WorkerPool): idx = random.choice(range(len(self.workers))) self.write(idx, m) - # if we are not in the dangerous situation of queue backup then clear old waiting jobs - if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: - reaper.reap_waiting() - - # if the database says a job is running on this node, but it's *not*, + # if the database says a job is running or queued on this node, but it's *not*, # then reap it running_uuids = [] for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) + delta = time.time() - start_time + if delta > 1.0: + logger.warning(f'Took {delta} for internal part of cleanup') + start_time = time.time() reaper.reap(excluded_uuids=running_uuids) def up(self): @@ -471,6 +473,10 @@ class AutoscalePool(WorkerPool): w.put(body) break else: + task_name = 'unknown' + if isinstance(body, dict): + task_name = body.get('task') + logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') return super(AutoscalePool, self).write(preferred_queue, body) except Exception: for conn in connections.all(): diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index e873465155..dd19c1338c 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -2,6 +2,7 @@ import inspect import logging import sys import json +import time from uuid import uuid4 from django.conf import settings @@ -75,7 +76,7 @@ class task: msg = f'{cls.name}: Queue value required and may not be None' logger.error(msg) raise ValueError(msg) - obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} + obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()} guid = get_guid() if guid: obj['guid'] = guid diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 46418828b6..b982cb8ab4 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -17,6 +17,7 @@ from django.conf import settings from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch import pg_bus_conn +from awx.main.utils.common import log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -81,6 +82,9 @@ class AWXConsumerBase(object): logger.error('unrecognized control message: {}'.format(control)) def process_task(self, body): + if isinstance(body, dict): + body['time_ack'] = time.time() + if 'control' in body: try: return self.control(body) @@ -101,6 +105,7 @@ class AWXConsumerBase(object): self.total_messages += 1 self.record_statistics() + @log_excess_runtime(logger) def record_statistics(self): if time.time() - self.last_stats > 1: # buffer stat recording to once per second try: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 68915921fb..0578a4ff97 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -183,7 +183,6 @@ class CallbackBrokerWorker(BaseWorker): except Exception as exc_indv: consecutive_errors += 1 logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') - if consecutive_errors >= 5: raise metrics_singular_events_saved += events_saved diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index e1fe196ddb..04f63002c5 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -3,6 +3,7 @@ import logging import importlib import sys import traceback +import time from kubernetes.config import kube_config @@ -60,8 +61,19 @@ class TaskWorker(BaseWorker): # the callable is a class, e.g., RunJob; instantiate and # return its `run()` method _call = _call().run + + log_extra = '' + logger_method = logger.debug + if ('time_ack' in body) and ('time_pub' in body): + time_publish = body['time_ack'] - body['time_pub'] + time_waiting = time.time() - body['time_ack'] + if time_waiting > 5.0 or time_publish > 5.0: + # If task too a very long time to process, add this information to the log + log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher' + logger_method = logger.info # don't print kwargs, they often contain launch-time secrets - logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) + logger_method(f'task {uuid} starting {task}(*{args}){log_extra}') + return _call(*args, **kwargs) def perform_work(self, body): diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 29ebff1178..627e38a1fb 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -6,6 +6,7 @@ from datetime import timedelta import json import yaml import logging +import time import os import subprocess import re @@ -1183,3 +1184,19 @@ def cleanup_new_process(func): return func(*args, **kwargs) return wrapper_cleanup_new_process + + +def log_excess_runtime(func_logger, cutoff=5.0): + def log_excess_runtime_decorator(func): + @wraps(func) + def _new_func(*args, **kwargs): + start_time = time.time() + return_value = func(*args, **kwargs) + delta = time.time() - start_time + if delta > cutoff: + logger.info(f'Running {func.__name__!r} took {delta:.2f}s') + return return_value + + return _new_func + + return log_excess_runtime_decorator