From a3fef27002907bb8af04ade135f75feca4780674 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 13 Jul 2022 09:39:58 -0400 Subject: [PATCH 1/9] Add logs to debug waiting bottlenecking --- awx/main/dispatch/pool.py | 18 ++++++++++++------ awx/main/dispatch/publish.py | 3 ++- awx/main/dispatch/worker/base.py | 5 +++++ awx/main/dispatch/worker/callback.py | 1 - awx/main/dispatch/worker/task.py | 14 +++++++++++++- awx/main/utils/common.py | 17 +++++++++++++++++ 6 files changed, 49 insertions(+), 9 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index d9a3f36324..f700656ea4 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,7 +22,7 @@ import psutil from awx.main.models import UnifiedJob from awx.main.dispatch import reaper -from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity +from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -364,6 +364,7 @@ class AutoscalePool(WorkerPool): def debug_meta(self): return 'min={} max={}'.format(self.min_workers, self.max_workers) + @log_excess_runtime(logger) def cleanup(self): """ Perform some internal account and cleanup. This is run on @@ -380,6 +381,7 @@ class AutoscalePool(WorkerPool): if there's an outage, this method _can_ throw various django.db.utils.Error exceptions. Act accordingly. """ + start_time = time.time() orphaned = [] for w in self.workers[::]: if not w.alive: @@ -432,16 +434,16 @@ class AutoscalePool(WorkerPool): idx = random.choice(range(len(self.workers))) self.write(idx, m) - # if we are not in the dangerous situation of queue backup then clear old waiting jobs - if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: - reaper.reap_waiting() - - # if the database says a job is running on this node, but it's *not*, + # if the database says a job is running or queued on this node, but it's *not*, # then reap it running_uuids = [] for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) + delta = time.time() - start_time + if delta > 1.0: + logger.warning(f'Took {delta} for internal part of cleanup') + start_time = time.time() reaper.reap(excluded_uuids=running_uuids) def up(self): @@ -471,6 +473,10 @@ class AutoscalePool(WorkerPool): w.put(body) break else: + task_name = 'unknown' + if isinstance(body, dict): + task_name = body.get('task') + logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') return super(AutoscalePool, self).write(preferred_queue, body) except Exception: for conn in connections.all(): diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index e873465155..dd19c1338c 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -2,6 +2,7 @@ import inspect import logging import sys import json +import time from uuid import uuid4 from django.conf import settings @@ -75,7 +76,7 @@ class task: msg = f'{cls.name}: Queue value required and may not be None' logger.error(msg) raise ValueError(msg) - obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} + obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()} guid = get_guid() if guid: obj['guid'] = guid diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 46418828b6..b982cb8ab4 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -17,6 +17,7 @@ from django.conf import settings from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch import pg_bus_conn +from awx.main.utils.common import log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -81,6 +82,9 @@ class AWXConsumerBase(object): logger.error('unrecognized control message: {}'.format(control)) def process_task(self, body): + if isinstance(body, dict): + body['time_ack'] = time.time() + if 'control' in body: try: return self.control(body) @@ -101,6 +105,7 @@ class AWXConsumerBase(object): self.total_messages += 1 self.record_statistics() + @log_excess_runtime(logger) def record_statistics(self): if time.time() - self.last_stats > 1: # buffer stat recording to once per second try: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 68915921fb..0578a4ff97 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -183,7 +183,6 @@ class CallbackBrokerWorker(BaseWorker): except Exception as exc_indv: consecutive_errors += 1 logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') - if consecutive_errors >= 5: raise metrics_singular_events_saved += events_saved diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index e1fe196ddb..04f63002c5 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -3,6 +3,7 @@ import logging import importlib import sys import traceback +import time from kubernetes.config import kube_config @@ -60,8 +61,19 @@ class TaskWorker(BaseWorker): # the callable is a class, e.g., RunJob; instantiate and # return its `run()` method _call = _call().run + + log_extra = '' + logger_method = logger.debug + if ('time_ack' in body) and ('time_pub' in body): + time_publish = body['time_ack'] - body['time_pub'] + time_waiting = time.time() - body['time_ack'] + if time_waiting > 5.0 or time_publish > 5.0: + # If task too a very long time to process, add this information to the log + log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher' + logger_method = logger.info # don't print kwargs, they often contain launch-time secrets - logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) + logger_method(f'task {uuid} starting {task}(*{args}){log_extra}') + return _call(*args, **kwargs) def perform_work(self, body): diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 29ebff1178..627e38a1fb 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -6,6 +6,7 @@ from datetime import timedelta import json import yaml import logging +import time import os import subprocess import re @@ -1183,3 +1184,19 @@ def cleanup_new_process(func): return func(*args, **kwargs) return wrapper_cleanup_new_process + + +def log_excess_runtime(func_logger, cutoff=5.0): + def log_excess_runtime_decorator(func): + @wraps(func) + def _new_func(*args, **kwargs): + start_time = time.time() + return_value = func(*args, **kwargs) + delta = time.time() - start_time + if delta > cutoff: + logger.info(f'Running {func.__name__!r} took {delta:.2f}s') + return return_value + + return _new_func + + return log_excess_runtime_decorator From 43a53f41dd25f23e2f18c5cab1d903c2facb8bfa Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 14 Jul 2022 12:39:30 -0400 Subject: [PATCH 2/9] Add logs about heartbeat skew Co-authored-by: Shane McDonald --- awx/main/tasks/system.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 932c8abe5b..7f9e157d8c 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -506,10 +506,15 @@ def cluster_node_heartbeat(): if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) + last_last_seen = this_inst.last_seen this_inst.local_health_check() if startup_event and this_inst.capacity != 0: - logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}') return + elif not last_last_seen: + logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}') + elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2): + logger.warning(f'Heartbeat skew - interval={(nowtime - last_last_seen).total_seconds():.4f}, expected={settings.CLUSTER_NODE_HEARTBEAT_PERIOD}') else: if settings.AWX_AUTO_DEPROVISION_INSTANCES: (changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID) From c649809eb2313c6f2f90254638eba289f1bf7d95 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Wed, 13 Jul 2022 14:56:17 -0400 Subject: [PATCH 3/9] Remove debug method that calls cleanup - It's unclear why this was here. - Removing it doesnt appear to cause any problems. - It still gets called during heartbeats. --- awx/main/dispatch/pool.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index f700656ea4..a102ff0b03 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -344,10 +344,6 @@ class AutoscalePool(WorkerPool): # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) - def debug(self, *args, **kwargs): - self.cleanup() - return super(AutoscalePool, self).debug(*args, **kwargs) - @property def should_grow(self): if len(self.workers) < self.min_workers: From 3c51cb130f77c6f7e2d813d9870ed2841d4fca85 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Wed, 13 Jul 2022 16:40:28 -0400 Subject: [PATCH 4/9] Add grace period settings for task manager timeout, and pod / job waiting reapers Co-authored-by: Alan Rominger --- awx/main/dispatch/pool.py | 18 +++++++++++------- awx/main/dispatch/reaper.py | 20 +++++++++++++++----- awx/main/tasks/jobs.py | 5 ++++- awx/main/tasks/system.py | 5 +++-- awx/settings/defaults.py | 11 +++++++++++ 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index a102ff0b03..c9a60f2265 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -344,6 +344,10 @@ class AutoscalePool(WorkerPool): # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) + # the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own + # but if the task takes longer than the time defined here, we will force it to stop here + self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD + @property def should_grow(self): if len(self.workers) < self.min_workers: @@ -377,7 +381,6 @@ class AutoscalePool(WorkerPool): if there's an outage, this method _can_ throw various django.db.utils.Error exceptions. Act accordingly. """ - start_time = time.time() orphaned = [] for w in self.workers[::]: if not w.alive: @@ -419,8 +422,8 @@ class AutoscalePool(WorkerPool): w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): - logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa + if age > self.task_manager_timeout: + logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') os.kill(w.pid, signal.SIGTERM) for m in orphaned: @@ -436,10 +439,11 @@ class AutoscalePool(WorkerPool): for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) - delta = time.time() - start_time - if delta > 1.0: - logger.warning(f'Took {delta} for internal part of cleanup') - start_time = time.time() + + # if we are not in the dangerous situation of queue backup then clear old waiting jobs + if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: + reaper.reap_waiting(excluded_uuids=running_uuids) + reaper.reap(excluded_uuids=running_uuids) def up(self): diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index fc67b5762f..243ae3b715 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -2,6 +2,7 @@ from datetime import timedelta import logging from django.db.models import Q +from django.conf import settings from django.utils.timezone import now as tz_now from django.contrib.contenttypes.models import ContentType @@ -34,7 +35,9 @@ def startup_reaping(): def reap_job(j, status): - if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'): + j.refresh_from_db(fields=['status']) + status_before = j.status + if status_before not in ('running', 'waiting'): # just in case, don't reap jobs that aren't running return j.status = status @@ -49,13 +52,16 @@ def reap_job(j, status): if hasattr(j, 'send_notification_templates'): j.send_notification_templates('failed') j.websocket_emit_status(status) - logger.error('{} is no longer running; reaping'.format(j.log_format)) + logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', grace_period=60): +def reap_waiting(instance=None, status='failed', grace_period=None, excluded_uuids=None): """ Reap all jobs in waiting for this instance. """ + if grace_period is None: + grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT + me = instance if me is None: try: @@ -65,11 +71,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60): return now = tz_now() jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: reap_job(j, status) -def reap(instance=None, status='failed', excluded_uuids=[]): +def reap(instance=None, status='failed', excluded_uuids=None): """ Reap all jobs in running for this instance. """ @@ -83,6 +91,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]): workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter( Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ).exclude(celery_task_id__in=excluded_uuids) + ) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: reap_job(j, status) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 954ced13fd..94ce454b44 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -609,7 +609,10 @@ class BaseTask(object): elif status == 'canceled': self.instance = self.update_model(pk) if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): - self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + # MERGE: prefer devel over this with runner_callback.delay_update() + job_explanation = "Task was canceled due to receiving a shutdown signal." + self.instance.job_explanation = self.instance.job_explanation or job_explanation + extra_update_fields['job_explanation'] = self.instance.job_explanation status = 'failed' except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc)) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 7f9e157d8c..fc50938ece 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -15,7 +15,7 @@ from distutils.version import LooseVersion as Version from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ @@ -608,7 +608,8 @@ def awx_k8s_reaper(): for group in InstanceGroup.objects.filter(is_container_group=True).iterator(): logger.debug("Checking for orphaned k8s pods for {}.".format(group)) pods = PodManager.list_active_jobs(group) - for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES): + time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD) + for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES): logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) try: pm = PodManager(job) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index bf20f8b9f8..499225a17d 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1028,3 +1028,14 @@ DEFAULT_CONTAINER_RUN_OPTIONS = ['--network', 'slirp4netns:enable_ipv6=true'] # Mount exposed paths as hostPath resource in k8s/ocp AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False + +# Time out task managers if they take longer than this many seconds +TASK_MANAGER_TIMEOUT = 300 + +# Number of seconds _in addition to_ the task manager timeout a job can stay +# in waiting without being reaped +JOB_WAITING_GRACE_PERIOD = 60 + +# Number of seconds after a container group job finished time to wait +# before the awx_k8s_reaper task will tear down the pods +K8S_POD_REAPER_GRACE_PERIOD = 60 From c5976e2584994f9a364ce1e1e3126714b33f89f9 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Sat, 23 Jul 2022 13:22:26 -0400 Subject: [PATCH 5/9] Add setting for missed heartbeats before marking node offline --- awx/main/models/ha.py | 2 +- awx/settings/defaults.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 3a6b7740a2..5f9588f627 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -207,7 +207,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): return True if ref_time is None: ref_time = now() - grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 + grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * settings.CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE if self.node_type in ('execution', 'hop'): grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD return self.last_seen < ref_time - timedelta(seconds=grace_period) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 499225a17d..a24fe6f090 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -432,6 +432,10 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') # heartbeat period can factor into some forms of logic, so it is maintained as a setting here CLUSTER_NODE_HEARTBEAT_PERIOD = 60 + +# Number of missed heartbeats until a node gets marked as lost +CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE = 2 + RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved From 16be38bb544b05a8ea40827d52d6102b1415b6a3 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Thu, 14 Jul 2022 13:11:14 -0400 Subject: [PATCH 6/9] Allow for passing custom job_explanation to reaper methods Co-authored-by: Alan Rominger --- awx/main/dispatch/reaper.py | 24 ++++++++++++------------ awx/main/tasks/system.py | 5 +++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index 243ae3b715..7a0ae1b884 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -34,20 +34,20 @@ def startup_reaping(): logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') -def reap_job(j, status): - j.refresh_from_db(fields=['status']) +def reap_job(j, status, job_explanation=None): + j.refresh_from_db(fields=['status', 'job_explanation']) status_before = j.status if status_before not in ('running', 'waiting'): # just in case, don't reap jobs that aren't running return j.status = status j.start_args = '' # blank field to remove encrypted passwords - j.job_explanation += ' '.join( - ( - 'Task was marked as running but was not present in', - 'the job queue, so it has been marked as failed.', - ) - ) + if j.job_explanation: + j.job_explanation += ' ' # Separate messages for readability + if job_explanation is None: + j.job_explanation += 'Task was marked as running but was not present in the job queue, so it has been marked as failed.' + else: + j.job_explanation += job_explanation j.save(update_fields=['status', 'start_args', 'job_explanation']) if hasattr(j, 'send_notification_templates'): j.send_notification_templates('failed') @@ -55,7 +55,7 @@ def reap_job(j, status): logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', grace_period=None, excluded_uuids=None): +def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None): """ Reap all jobs in waiting for this instance. """ @@ -74,10 +74,10 @@ def reap_waiting(instance=None, status='failed', grace_period=None, excluded_uui if excluded_uuids: jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) -def reap(instance=None, status='failed', excluded_uuids=None): +def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None): """ Reap all jobs in running for this instance. """ @@ -95,4 +95,4 @@ def reap(instance=None, status='failed', excluded_uuids=None): if excluded_uuids: jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index fc50938ece..e36c502400 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -542,8 +542,9 @@ def cluster_node_heartbeat(): for other_inst in lost_instances: try: - reaper.reap(other_inst) - reaper.reap_waiting(this_inst, grace_period=0) + explanation = "Job reaped due to instance shutdown" + reaper.reap(other_inst, job_explanation=explanation) + reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation) except Exception: logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) try: From 621833ef0ee56b1a5ff4ae03c99555f583205ff6 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 14 Jul 2022 15:54:44 -0400 Subject: [PATCH 7/9] Add extra workers if computing based on memory Co-authored-by: Elijah DeLee --- awx/main/dispatch/pool.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index c9a60f2265..ce6c297861 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -341,6 +341,10 @@ class AutoscalePool(WorkerPool): # Get same number as max forks based on memory, this function takes memory as bytes self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30) + # add magic prime number of extra workers to ensure + # we have a few extra workers to run the heartbeat + self.max_workers += 7 + # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) From 1ea3c564df167c481684c119c3dc37eadeb05591 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 19 Jul 2022 16:18:45 -0400 Subject: [PATCH 8/9] Apply a failed status if cancel_flag is not set --- awx/main/tasks/jobs.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 94ce454b44..aeba4cc254 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -608,12 +608,18 @@ class BaseTask(object): status = 'failed' elif status == 'canceled': self.instance = self.update_model(pk) - if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): - # MERGE: prefer devel over this with runner_callback.delay_update() + cancel_flag_value = getattr(self.instance, 'cancel_flag', False) + if (cancel_flag_value is False) and signal_callback(): + # MERGE: prefer devel over this with runner_callback.delay_update(), and for elif case too job_explanation = "Task was canceled due to receiving a shutdown signal." self.instance.job_explanation = self.instance.job_explanation or job_explanation extra_update_fields['job_explanation'] = self.instance.job_explanation status = 'failed' + elif cancel_flag_value is False: + job_explanation = "The running ansible process received a shutdown signal." + self.instance.job_explanation = self.instance.job_explanation or job_explanation + extra_update_fields['job_explanation'] = self.instance.job_explanation + status = 'failed' except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc)) except Exception: From 56739ac246507f5ddece4a6b6b6952459ef9e5c6 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 17 Aug 2022 11:45:40 -0400 Subject: [PATCH 9/9] Use delay_update to set error message, according to merge note --- awx/main/tasks/jobs.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index aeba4cc254..774469c3ba 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -610,15 +610,10 @@ class BaseTask(object): self.instance = self.update_model(pk) cancel_flag_value = getattr(self.instance, 'cancel_flag', False) if (cancel_flag_value is False) and signal_callback(): - # MERGE: prefer devel over this with runner_callback.delay_update(), and for elif case too - job_explanation = "Task was canceled due to receiving a shutdown signal." - self.instance.job_explanation = self.instance.job_explanation or job_explanation - extra_update_fields['job_explanation'] = self.instance.job_explanation + self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="Task was canceled due to receiving a shutdown signal.") status = 'failed' elif cancel_flag_value is False: - job_explanation = "The running ansible process received a shutdown signal." - self.instance.job_explanation = self.instance.job_explanation or job_explanation - extra_update_fields['job_explanation'] = self.instance.job_explanation + self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="The running ansible process received a shutdown signal.") status = 'failed' except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc))