Merge pull request #12676 from AlanCoding/forward_picks

Stability fixes, and related logging for slowdowns in dispatcher task processing
This commit is contained in:
Alan Rominger
2022-08-17 13:32:34 -04:00
committed by GitHub
11 changed files with 116 additions and 36 deletions

View File

@@ -22,7 +22,7 @@ import psutil
from awx.main.models import UnifiedJob from awx.main.models import UnifiedJob
from awx.main.dispatch import reaper from awx.main.dispatch import reaper
from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime
if 'run_callback_receiver' in sys.argv: if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver') logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@@ -341,12 +341,16 @@ class AutoscalePool(WorkerPool):
# Get same number as max forks based on memory, this function takes memory as bytes # Get same number as max forks based on memory, this function takes memory as bytes
self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30) self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30)
# add magic prime number of extra workers to ensure
# we have a few extra workers to run the heartbeat
self.max_workers += 7
# max workers can't be less than min_workers # max workers can't be less than min_workers
self.max_workers = max(self.min_workers, self.max_workers) self.max_workers = max(self.min_workers, self.max_workers)
def debug(self, *args, **kwargs): # the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own
self.cleanup() # but if the task takes longer than the time defined here, we will force it to stop here
return super(AutoscalePool, self).debug(*args, **kwargs) self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD
@property @property
def should_grow(self): def should_grow(self):
@@ -364,6 +368,7 @@ class AutoscalePool(WorkerPool):
def debug_meta(self): def debug_meta(self):
return 'min={} max={}'.format(self.min_workers, self.max_workers) return 'min={} max={}'.format(self.min_workers, self.max_workers)
@log_excess_runtime(logger)
def cleanup(self): def cleanup(self):
""" """
Perform some internal account and cleanup. This is run on Perform some internal account and cleanup. This is run on
@@ -421,8 +426,8 @@ class AutoscalePool(WorkerPool):
w.managed_tasks[current_task['uuid']]['started'] = time.time() w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started'] age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age w.managed_tasks[current_task['uuid']]['age'] = age
if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): if age > self.task_manager_timeout:
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}')
os.kill(w.pid, signal.SIGTERM) os.kill(w.pid, signal.SIGTERM)
for m in orphaned: for m in orphaned:
@@ -432,16 +437,17 @@ class AutoscalePool(WorkerPool):
idx = random.choice(range(len(self.workers))) idx = random.choice(range(len(self.workers)))
self.write(idx, m) self.write(idx, m)
# if we are not in the dangerous situation of queue backup then clear old waiting jobs # if the database says a job is running or queued on this node, but it's *not*,
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting()
# if the database says a job is running on this node, but it's *not*,
# then reap it # then reap it
running_uuids = [] running_uuids = []
for worker in self.workers: for worker in self.workers:
worker.calculate_managed_tasks() worker.calculate_managed_tasks()
running_uuids.extend(list(worker.managed_tasks.keys())) running_uuids.extend(list(worker.managed_tasks.keys()))
# if we are not in the dangerous situation of queue backup then clear old waiting jobs
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting(excluded_uuids=running_uuids)
reaper.reap(excluded_uuids=running_uuids) reaper.reap(excluded_uuids=running_uuids)
def up(self): def up(self):
@@ -471,6 +477,10 @@ class AutoscalePool(WorkerPool):
w.put(body) w.put(body)
break break
else: else:
task_name = 'unknown'
if isinstance(body, dict):
task_name = body.get('task')
logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}')
return super(AutoscalePool, self).write(preferred_queue, body) return super(AutoscalePool, self).write(preferred_queue, body)
except Exception: except Exception:
for conn in connections.all(): for conn in connections.all():

View File

@@ -2,6 +2,7 @@ import inspect
import logging import logging
import sys import sys
import json import json
import time
from uuid import uuid4 from uuid import uuid4
from django.conf import settings from django.conf import settings
@@ -75,7 +76,7 @@ class task:
msg = f'{cls.name}: Queue value required and may not be None' msg = f'{cls.name}: Queue value required and may not be None'
logger.error(msg) logger.error(msg)
raise ValueError(msg) raise ValueError(msg)
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
guid = get_guid() guid = get_guid()
if guid: if guid:
obj['guid'] = guid obj['guid'] = guid

View File

@@ -2,6 +2,7 @@ from datetime import timedelta
import logging import logging
from django.db.models import Q from django.db.models import Q
from django.conf import settings
from django.utils.timezone import now as tz_now from django.utils.timezone import now as tz_now
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
@@ -33,29 +34,34 @@ def startup_reaping():
logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup')
def reap_job(j, status): def reap_job(j, status, job_explanation=None):
if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'): j.refresh_from_db(fields=['status', 'job_explanation'])
status_before = j.status
if status_before not in ('running', 'waiting'):
# just in case, don't reap jobs that aren't running # just in case, don't reap jobs that aren't running
return return
j.status = status j.status = status
j.start_args = '' # blank field to remove encrypted passwords j.start_args = '' # blank field to remove encrypted passwords
j.job_explanation += ' '.join( if j.job_explanation:
( j.job_explanation += ' ' # Separate messages for readability
'Task was marked as running but was not present in', if job_explanation is None:
'the job queue, so it has been marked as failed.', j.job_explanation += 'Task was marked as running but was not present in the job queue, so it has been marked as failed.'
) else:
) j.job_explanation += job_explanation
j.save(update_fields=['status', 'start_args', 'job_explanation']) j.save(update_fields=['status', 'start_args', 'job_explanation'])
if hasattr(j, 'send_notification_templates'): if hasattr(j, 'send_notification_templates'):
j.send_notification_templates('failed') j.send_notification_templates('failed')
j.websocket_emit_status(status) j.websocket_emit_status(status)
logger.error('{} is no longer running; reaping'.format(j.log_format)) logger.error(f'{j.log_format} is no longer {status_before}; reaping')
def reap_waiting(instance=None, status='failed', grace_period=60): def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None):
""" """
Reap all jobs in waiting for this instance. Reap all jobs in waiting for this instance.
""" """
if grace_period is None:
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT
me = instance me = instance
if me is None: if me is None:
try: try:
@@ -65,11 +71,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60):
return return
now = tz_now() now = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs: for j in jobs:
reap_job(j, status) reap_job(j, status, job_explanation=job_explanation)
def reap(instance=None, status='failed', excluded_uuids=[]): def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None):
""" """
Reap all jobs in running for this instance. Reap all jobs in running for this instance.
""" """
@@ -83,6 +91,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]):
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter( jobs = UnifiedJob.objects.filter(
Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
).exclude(celery_task_id__in=excluded_uuids) )
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs: for j in jobs:
reap_job(j, status) reap_job(j, status, job_explanation=job_explanation)

View File

@@ -17,6 +17,7 @@ from django.conf import settings
from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch.pool import WorkerPool
from awx.main.dispatch import pg_bus_conn from awx.main.dispatch import pg_bus_conn
from awx.main.utils.common import log_excess_runtime
if 'run_callback_receiver' in sys.argv: if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver') logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@@ -81,6 +82,9 @@ class AWXConsumerBase(object):
logger.error('unrecognized control message: {}'.format(control)) logger.error('unrecognized control message: {}'.format(control))
def process_task(self, body): def process_task(self, body):
if isinstance(body, dict):
body['time_ack'] = time.time()
if 'control' in body: if 'control' in body:
try: try:
return self.control(body) return self.control(body)
@@ -101,6 +105,7 @@ class AWXConsumerBase(object):
self.total_messages += 1 self.total_messages += 1
self.record_statistics() self.record_statistics()
@log_excess_runtime(logger)
def record_statistics(self): def record_statistics(self):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second if time.time() - self.last_stats > 1: # buffer stat recording to once per second
try: try:

View File

@@ -183,7 +183,6 @@ class CallbackBrokerWorker(BaseWorker):
except Exception as exc_indv: except Exception as exc_indv:
consecutive_errors += 1 consecutive_errors += 1
logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}')
if consecutive_errors >= 5: if consecutive_errors >= 5:
raise raise
metrics_singular_events_saved += events_saved metrics_singular_events_saved += events_saved

View File

@@ -3,6 +3,7 @@ import logging
import importlib import importlib
import sys import sys
import traceback import traceback
import time
from kubernetes.config import kube_config from kubernetes.config import kube_config
@@ -60,8 +61,19 @@ class TaskWorker(BaseWorker):
# the callable is a class, e.g., RunJob; instantiate and # the callable is a class, e.g., RunJob; instantiate and
# return its `run()` method # return its `run()` method
_call = _call().run _call = _call().run
log_extra = ''
logger_method = logger.debug
if ('time_ack' in body) and ('time_pub' in body):
time_publish = body['time_ack'] - body['time_pub']
time_waiting = time.time() - body['time_ack']
if time_waiting > 5.0 or time_publish > 5.0:
# If task too a very long time to process, add this information to the log
log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher'
logger_method = logger.info
# don't print kwargs, they often contain launch-time secrets # don't print kwargs, they often contain launch-time secrets
logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) logger_method(f'task {uuid} starting {task}(*{args}){log_extra}')
return _call(*args, **kwargs) return _call(*args, **kwargs)
def perform_work(self, body): def perform_work(self, body):

View File

@@ -207,7 +207,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
return True return True
if ref_time is None: if ref_time is None:
ref_time = now() ref_time = now()
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * settings.CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE
if self.node_type in ('execution', 'hop'): if self.node_type in ('execution', 'hop'):
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
return self.last_seen < ref_time - timedelta(seconds=grace_period) return self.last_seen < ref_time - timedelta(seconds=grace_period)

View File

@@ -608,8 +608,12 @@ class BaseTask(object):
status = 'failed' status = 'failed'
elif status == 'canceled': elif status == 'canceled':
self.instance = self.update_model(pk) self.instance = self.update_model(pk)
if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): cancel_flag_value = getattr(self.instance, 'cancel_flag', False)
self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") if (cancel_flag_value is False) and signal_callback():
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="Task was canceled due to receiving a shutdown signal.")
status = 'failed'
elif cancel_flag_value is False:
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="The running ansible process received a shutdown signal.")
status = 'failed' status = 'failed'
except ReceptorNodeNotFound as exc: except ReceptorNodeNotFound as exc:
self.runner_callback.delay_update(job_explanation=str(exc)) self.runner_callback.delay_update(job_explanation=str(exc))

View File

@@ -15,7 +15,7 @@ from distutils.version import LooseVersion as Version
from django.conf import settings from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError from django.db import transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str from django.utils.encoding import smart_str
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
@@ -506,10 +506,15 @@ def cluster_node_heartbeat():
if this_inst: if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime) startup_event = this_inst.is_lost(ref_time=nowtime)
last_last_seen = this_inst.last_seen
this_inst.local_health_check() this_inst.local_health_check()
if startup_event and this_inst.capacity != 0: if startup_event and this_inst.capacity != 0:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}')
return return
elif not last_last_seen:
logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}')
elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2):
logger.warning(f'Heartbeat skew - interval={(nowtime - last_last_seen).total_seconds():.4f}, expected={settings.CLUSTER_NODE_HEARTBEAT_PERIOD}')
else: else:
if settings.AWX_AUTO_DEPROVISION_INSTANCES: if settings.AWX_AUTO_DEPROVISION_INSTANCES:
(changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID) (changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID)
@@ -537,8 +542,9 @@ def cluster_node_heartbeat():
for other_inst in lost_instances: for other_inst in lost_instances:
try: try:
reaper.reap(other_inst) explanation = "Job reaped due to instance shutdown"
reaper.reap_waiting(this_inst, grace_period=0) reaper.reap(other_inst, job_explanation=explanation)
reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation)
except Exception: except Exception:
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
try: try:
@@ -603,7 +609,8 @@ def awx_k8s_reaper():
for group in InstanceGroup.objects.filter(is_container_group=True).iterator(): for group in InstanceGroup.objects.filter(is_container_group=True).iterator():
logger.debug("Checking for orphaned k8s pods for {}.".format(group)) logger.debug("Checking for orphaned k8s pods for {}.".format(group))
pods = PodManager.list_active_jobs(group) pods = PodManager.list_active_jobs(group)
for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES): time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD)
for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES):
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
try: try:
pm = PodManager(job) pm = PodManager(job)

View File

@@ -6,6 +6,7 @@ from datetime import timedelta
import json import json
import yaml import yaml
import logging import logging
import time
import os import os
import subprocess import subprocess
import re import re
@@ -1183,3 +1184,19 @@ def cleanup_new_process(func):
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper_cleanup_new_process return wrapper_cleanup_new_process
def log_excess_runtime(func_logger, cutoff=5.0):
def log_excess_runtime_decorator(func):
@wraps(func)
def _new_func(*args, **kwargs):
start_time = time.time()
return_value = func(*args, **kwargs)
delta = time.time() - start_time
if delta > cutoff:
logger.info(f'Running {func.__name__!r} took {delta:.2f}s')
return return_value
return _new_func
return log_excess_runtime_decorator

View File

@@ -432,6 +432,10 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
# heartbeat period can factor into some forms of logic, so it is maintained as a setting here # heartbeat period can factor into some forms of logic, so it is maintained as a setting here
CLUSTER_NODE_HEARTBEAT_PERIOD = 60 CLUSTER_NODE_HEARTBEAT_PERIOD = 60
# Number of missed heartbeats until a node gets marked as lost
CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE = 2
RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34
EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved
@@ -1028,3 +1032,14 @@ DEFAULT_CONTAINER_RUN_OPTIONS = ['--network', 'slirp4netns:enable_ipv6=true']
# Mount exposed paths as hostPath resource in k8s/ocp # Mount exposed paths as hostPath resource in k8s/ocp
AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False
# Time out task managers if they take longer than this many seconds
TASK_MANAGER_TIMEOUT = 300
# Number of seconds _in addition to_ the task manager timeout a job can stay
# in waiting without being reaped
JOB_WAITING_GRACE_PERIOD = 60
# Number of seconds after a container group job finished time to wait
# before the awx_k8s_reaper task will tear down the pods
K8S_POD_REAPER_GRACE_PERIOD = 60