diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index d9a3f36324..ce6c297861 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,7 +22,7 @@ import psutil from awx.main.models import UnifiedJob from awx.main.dispatch import reaper -from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity +from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -341,12 +341,16 @@ class AutoscalePool(WorkerPool): # Get same number as max forks based on memory, this function takes memory as bytes self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30) + # add magic prime number of extra workers to ensure + # we have a few extra workers to run the heartbeat + self.max_workers += 7 + # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) - def debug(self, *args, **kwargs): - self.cleanup() - return super(AutoscalePool, self).debug(*args, **kwargs) + # the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own + # but if the task takes longer than the time defined here, we will force it to stop here + self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD @property def should_grow(self): @@ -364,6 +368,7 @@ class AutoscalePool(WorkerPool): def debug_meta(self): return 'min={} max={}'.format(self.min_workers, self.max_workers) + @log_excess_runtime(logger) def cleanup(self): """ Perform some internal account and cleanup. This is run on @@ -421,8 +426,8 @@ class AutoscalePool(WorkerPool): w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): - logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa + if age > self.task_manager_timeout: + logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') os.kill(w.pid, signal.SIGTERM) for m in orphaned: @@ -432,16 +437,17 @@ class AutoscalePool(WorkerPool): idx = random.choice(range(len(self.workers))) self.write(idx, m) - # if we are not in the dangerous situation of queue backup then clear old waiting jobs - if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: - reaper.reap_waiting() - - # if the database says a job is running on this node, but it's *not*, + # if the database says a job is running or queued on this node, but it's *not*, # then reap it running_uuids = [] for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) + + # if we are not in the dangerous situation of queue backup then clear old waiting jobs + if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: + reaper.reap_waiting(excluded_uuids=running_uuids) + reaper.reap(excluded_uuids=running_uuids) def up(self): @@ -471,6 +477,10 @@ class AutoscalePool(WorkerPool): w.put(body) break else: + task_name = 'unknown' + if isinstance(body, dict): + task_name = body.get('task') + logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') return super(AutoscalePool, self).write(preferred_queue, body) except Exception: for conn in connections.all(): diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index e873465155..dd19c1338c 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -2,6 +2,7 @@ import inspect import logging import sys import json +import time from uuid import uuid4 from django.conf import settings @@ -75,7 +76,7 @@ class task: msg = f'{cls.name}: Queue value required and may not be None' logger.error(msg) raise ValueError(msg) - obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} + obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()} guid = get_guid() if guid: obj['guid'] = guid diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index fc67b5762f..7a0ae1b884 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -2,6 +2,7 @@ from datetime import timedelta import logging from django.db.models import Q +from django.conf import settings from django.utils.timezone import now as tz_now from django.contrib.contenttypes.models import ContentType @@ -33,29 +34,34 @@ def startup_reaping(): logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') -def reap_job(j, status): - if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'): +def reap_job(j, status, job_explanation=None): + j.refresh_from_db(fields=['status', 'job_explanation']) + status_before = j.status + if status_before not in ('running', 'waiting'): # just in case, don't reap jobs that aren't running return j.status = status j.start_args = '' # blank field to remove encrypted passwords - j.job_explanation += ' '.join( - ( - 'Task was marked as running but was not present in', - 'the job queue, so it has been marked as failed.', - ) - ) + if j.job_explanation: + j.job_explanation += ' ' # Separate messages for readability + if job_explanation is None: + j.job_explanation += 'Task was marked as running but was not present in the job queue, so it has been marked as failed.' + else: + j.job_explanation += job_explanation j.save(update_fields=['status', 'start_args', 'job_explanation']) if hasattr(j, 'send_notification_templates'): j.send_notification_templates('failed') j.websocket_emit_status(status) - logger.error('{} is no longer running; reaping'.format(j.log_format)) + logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', grace_period=60): +def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None): """ Reap all jobs in waiting for this instance. """ + if grace_period is None: + grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT + me = instance if me is None: try: @@ -65,11 +71,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60): return now = tz_now() jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) -def reap(instance=None, status='failed', excluded_uuids=[]): +def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None): """ Reap all jobs in running for this instance. """ @@ -83,6 +91,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]): workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter( Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ).exclude(celery_task_id__in=excluded_uuids) + ) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 46418828b6..b982cb8ab4 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -17,6 +17,7 @@ from django.conf import settings from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch import pg_bus_conn +from awx.main.utils.common import log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -81,6 +82,9 @@ class AWXConsumerBase(object): logger.error('unrecognized control message: {}'.format(control)) def process_task(self, body): + if isinstance(body, dict): + body['time_ack'] = time.time() + if 'control' in body: try: return self.control(body) @@ -101,6 +105,7 @@ class AWXConsumerBase(object): self.total_messages += 1 self.record_statistics() + @log_excess_runtime(logger) def record_statistics(self): if time.time() - self.last_stats > 1: # buffer stat recording to once per second try: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 68915921fb..0578a4ff97 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -183,7 +183,6 @@ class CallbackBrokerWorker(BaseWorker): except Exception as exc_indv: consecutive_errors += 1 logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') - if consecutive_errors >= 5: raise metrics_singular_events_saved += events_saved diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index e1fe196ddb..04f63002c5 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -3,6 +3,7 @@ import logging import importlib import sys import traceback +import time from kubernetes.config import kube_config @@ -60,8 +61,19 @@ class TaskWorker(BaseWorker): # the callable is a class, e.g., RunJob; instantiate and # return its `run()` method _call = _call().run + + log_extra = '' + logger_method = logger.debug + if ('time_ack' in body) and ('time_pub' in body): + time_publish = body['time_ack'] - body['time_pub'] + time_waiting = time.time() - body['time_ack'] + if time_waiting > 5.0 or time_publish > 5.0: + # If task too a very long time to process, add this information to the log + log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher' + logger_method = logger.info # don't print kwargs, they often contain launch-time secrets - logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) + logger_method(f'task {uuid} starting {task}(*{args}){log_extra}') + return _call(*args, **kwargs) def perform_work(self, body): diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 3a6b7740a2..5f9588f627 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -207,7 +207,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): return True if ref_time is None: ref_time = now() - grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 + grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * settings.CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE if self.node_type in ('execution', 'hop'): grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD return self.last_seen < ref_time - timedelta(seconds=grace_period) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 954ced13fd..774469c3ba 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -608,8 +608,12 @@ class BaseTask(object): status = 'failed' elif status == 'canceled': self.instance = self.update_model(pk) - if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): - self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + cancel_flag_value = getattr(self.instance, 'cancel_flag', False) + if (cancel_flag_value is False) and signal_callback(): + self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="Task was canceled due to receiving a shutdown signal.") + status = 'failed' + elif cancel_flag_value is False: + self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="The running ansible process received a shutdown signal.") status = 'failed' except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc)) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 932c8abe5b..e36c502400 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -15,7 +15,7 @@ from distutils.version import LooseVersion as Version from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ @@ -506,10 +506,15 @@ def cluster_node_heartbeat(): if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) + last_last_seen = this_inst.last_seen this_inst.local_health_check() if startup_event and this_inst.capacity != 0: - logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}') return + elif not last_last_seen: + logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}') + elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2): + logger.warning(f'Heartbeat skew - interval={(nowtime - last_last_seen).total_seconds():.4f}, expected={settings.CLUSTER_NODE_HEARTBEAT_PERIOD}') else: if settings.AWX_AUTO_DEPROVISION_INSTANCES: (changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID) @@ -537,8 +542,9 @@ def cluster_node_heartbeat(): for other_inst in lost_instances: try: - reaper.reap(other_inst) - reaper.reap_waiting(this_inst, grace_period=0) + explanation = "Job reaped due to instance shutdown" + reaper.reap(other_inst, job_explanation=explanation) + reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation) except Exception: logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) try: @@ -603,7 +609,8 @@ def awx_k8s_reaper(): for group in InstanceGroup.objects.filter(is_container_group=True).iterator(): logger.debug("Checking for orphaned k8s pods for {}.".format(group)) pods = PodManager.list_active_jobs(group) - for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES): + time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD) + for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES): logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) try: pm = PodManager(job) diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 29ebff1178..627e38a1fb 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -6,6 +6,7 @@ from datetime import timedelta import json import yaml import logging +import time import os import subprocess import re @@ -1183,3 +1184,19 @@ def cleanup_new_process(func): return func(*args, **kwargs) return wrapper_cleanup_new_process + + +def log_excess_runtime(func_logger, cutoff=5.0): + def log_excess_runtime_decorator(func): + @wraps(func) + def _new_func(*args, **kwargs): + start_time = time.time() + return_value = func(*args, **kwargs) + delta = time.time() - start_time + if delta > cutoff: + logger.info(f'Running {func.__name__!r} took {delta:.2f}s') + return return_value + + return _new_func + + return log_excess_runtime_decorator diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index bf20f8b9f8..a24fe6f090 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -432,6 +432,10 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') # heartbeat period can factor into some forms of logic, so it is maintained as a setting here CLUSTER_NODE_HEARTBEAT_PERIOD = 60 + +# Number of missed heartbeats until a node gets marked as lost +CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE = 2 + RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved @@ -1028,3 +1032,14 @@ DEFAULT_CONTAINER_RUN_OPTIONS = ['--network', 'slirp4netns:enable_ipv6=true'] # Mount exposed paths as hostPath resource in k8s/ocp AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False + +# Time out task managers if they take longer than this many seconds +TASK_MANAGER_TIMEOUT = 300 + +# Number of seconds _in addition to_ the task manager timeout a job can stay +# in waiting without being reaped +JOB_WAITING_GRACE_PERIOD = 60 + +# Number of seconds after a container group job finished time to wait +# before the awx_k8s_reaper task will tear down the pods +K8S_POD_REAPER_GRACE_PERIOD = 60