From 5209bfcf82ea6694e34f61b5d6613ac5809e0dc4 Mon Sep 17 00:00:00 2001 From: thedoubl3j Date: Wed, 7 Jan 2026 15:56:23 -0500 Subject: [PATCH] add back auto_max_workers * added back get_auto_max_workers into common utils * formatting edits --- awx/main/dispatch/config.py | 2 +- awx/main/dispatch/control.py | 1 - awx/main/dispatch/periodic.py | 146 ---------------------------------- awx/main/dispatch/pool.py | 2 - awx/main/utils/common.py | 38 +++++++++ 5 files changed, 39 insertions(+), 150 deletions(-) delete mode 100644 awx/main/dispatch/periodic.py diff --git a/awx/main/dispatch/config.py b/awx/main/dispatch/config.py index 9f5773e153..3809c93599 100644 --- a/awx/main/dispatch/config.py +++ b/awx/main/dispatch/config.py @@ -2,7 +2,7 @@ from django.conf import settings from ansible_base.lib.utils.db import get_pg_notify_params from awx.main.dispatch import get_task_queuename -from awx.main.dispatch.pool import get_auto_max_workers +from awx.main.utils.common import get_auto_max_workers def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False) -> dict: diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 8c44231bf9..976d7ae44f 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -36,4 +36,3 @@ class Control(object): @classmethod def generate_reply_queue_name(cls): return f"reply_to_{str(uuid.uuid4()).replace('-','_')}" - diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py deleted file mode 100644 index 0d3229cd91..0000000000 --- a/awx/main/dispatch/periodic.py +++ /dev/null @@ -1,146 +0,0 @@ -import logging -import time -import yaml -from datetime import datetime - -logger = logging.getLogger('awx.main.dispatch.periodic') - - -class ScheduledTask: - """ - Class representing schedules, very loosely modeled after python schedule library Job - the idea of this class is to: - - only deal in relative times (time since the scheduler global start) - - only deal in integer math for target runtimes, but float for current relative time - - Missed schedule policy: - Invariant target times are maintained, meaning that if interval=10s offset=0 - and it runs at t=7s, then it calls for next run in 3s. - However, if a complete interval has passed, that is counted as a missed run, - and missed runs are abandoned (no catch-up runs). - """ - - def __init__(self, name: str, data: dict): - # parameters need for schedule computation - self.interval = int(data['schedule'].total_seconds()) - self.offset = 0 # offset relative to start time this schedule begins - self.index = 0 # number of periods of the schedule that has passed - - # parameters that do not affect scheduling logic - self.last_run = None # time of last run, only used for debug - self.completed_runs = 0 # number of times schedule is known to run - self.name = name - self.data = data # used by caller to know what to run - - @property - def next_run(self): - "Time until the next run with t=0 being the global_start of the scheduler class" - return (self.index + 1) * self.interval + self.offset - - def due_to_run(self, relative_time): - return bool(self.next_run <= relative_time) - - def expected_runs(self, relative_time): - return int((relative_time - self.offset) / self.interval) - - def mark_run(self, relative_time): - self.last_run = relative_time - self.completed_runs += 1 - new_index = self.expected_runs(relative_time) - if new_index > self.index + 1: - logger.warning(f'Missed {new_index - self.index - 1} schedules of {self.name}') - self.index = new_index - - def missed_runs(self, relative_time): - "Number of times job was supposed to ran but failed to, only used for debug" - missed_ct = self.expected_runs(relative_time) - self.completed_runs - # if this is currently due to run do not count that as a missed run - if missed_ct and self.due_to_run(relative_time): - missed_ct -= 1 - return missed_ct - - -class Scheduler: - def __init__(self, schedule): - """ - Expects schedule in the form of a dictionary like - { - 'job1': {'schedule': timedelta(seconds=50), 'other': 'stuff'} - } - Only the schedule nearest-second value is used for scheduling, - the rest of the data is for use by the caller to know what to run. - """ - self.jobs = [ScheduledTask(name, data) for name, data in schedule.items()] - min_interval = min(job.interval for job in self.jobs) - num_jobs = len(self.jobs) - - # this is intentionally oppioniated against spammy schedules - # a core goal is to spread out the scheduled tasks (for worker management) - # and high-frequency schedules just do not work with that - if num_jobs > min_interval: - raise RuntimeError(f'Number of schedules ({num_jobs}) is more than the shortest schedule interval ({min_interval} seconds).') - - # even space out jobs over the base interval - for i, job in enumerate(self.jobs): - job.offset = (i * min_interval) // num_jobs - - # internally times are all referenced relative to startup time, add grace period - self.global_start = time.time() + 2.0 - - def get_and_mark_pending(self, reftime=None): - if reftime is None: - reftime = time.time() # mostly for tests - relative_time = reftime - self.global_start - to_run = [] - for job in self.jobs: - if job.due_to_run(relative_time): - to_run.append(job) - logger.debug(f'scheduler found {job.name} to run, {relative_time - job.next_run} seconds after target') - job.mark_run(relative_time) - return to_run - - def time_until_next_run(self, reftime=None): - if reftime is None: - reftime = time.time() # mostly for tests - relative_time = reftime - self.global_start - next_job = min(self.jobs, key=lambda j: j.next_run) - delta = next_job.next_run - relative_time - if delta <= 0.1: - # careful not to give 0 or negative values to the select timeout, which has unclear interpretation - logger.warning(f'Scheduler next run of {next_job.name} is {-delta} seconds in the past') - return 0.1 - elif delta > 20.0: - logger.warning(f'Scheduler next run unexpectedly over 20 seconds in future: {delta}') - return 20.0 - logger.debug(f'Scheduler next run is {next_job.name} in {delta} seconds') - return delta - - def debug(self, *args, **kwargs): - data = dict() - data['title'] = 'Scheduler status' - reftime = time.time() - - now = datetime.fromtimestamp(reftime).strftime('%Y-%m-%d %H:%M:%S UTC') - start_time = datetime.fromtimestamp(self.global_start).strftime('%Y-%m-%d %H:%M:%S UTC') - relative_time = reftime - self.global_start - data['started_time'] = start_time - data['current_time'] = now - data['current_time_relative'] = round(relative_time, 3) - data['total_schedules'] = len(self.jobs) - - data['schedule_list'] = dict( - [ - ( - job.name, - dict( - last_run_seconds_ago=round(relative_time - job.last_run, 3) if job.last_run else None, - next_run_in_seconds=round(job.next_run - relative_time, 3), - offset_in_seconds=job.offset, - completed_runs=job.completed_runs, - missed_runs=job.missed_runs(relative_time), - ), - ) - for job in sorted(self.jobs, key=lambda job: job.interval) - ] - ) - return yaml.safe_dump(data, default_flow_style=False, sort_keys=False) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index b447b1e2bc..a7162e9c40 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -24,7 +24,6 @@ else: logger = logging.getLogger('awx.main.dispatch') - class NoOpResultQueue(object): def put(self, item): pass @@ -306,4 +305,3 @@ class WorkerPool(object): os.kill(worker.pid, signum) except Exception: logger.exception('could not kill {}'.format(worker.pid)) - diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 2f45bb7c8f..4365887be2 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -43,6 +43,9 @@ from django.apps import apps # AWX from awx.conf.license import get_license +# ansible-runner +from ansible_runner.utils.capacity import get_mem_in_bytes, get_cpu_count + logger = logging.getLogger('awx.main.utils') __all__ = [ @@ -1220,3 +1223,38 @@ def unified_job_class_to_event_table_name(job_class): def load_all_entry_points_for(entry_point_subsections: list[str], /) -> dict[str, EntryPoint]: return {ep.name: ep for entry_point_category in entry_point_subsections for ep in entry_points(group=f'awx_plugins.{entry_point_category}')} + + +def get_auto_max_workers(): + """Method we normally rely on to get max_workers + + Uses almost same logic as Instance.local_health_check + The important thing is to be MORE than Instance.capacity + so that the task-manager does not over-schedule this node + + Ideally we would just use the capacity from the database plus reserve workers, + but this poses some bootstrap problems where OCP task containers + register themselves after startup + """ + # Get memory from ansible-runner + total_memory_gb = get_mem_in_bytes() + + # This may replace memory calculation with a user override + corrected_memory = get_corrected_memory(total_memory_gb) + + # Get same number as max forks based on memory, this function takes memory as bytes + mem_capacity = get_mem_effective_capacity(corrected_memory, is_control_node=True) + + # Follow same process for CPU capacity constraint + cpu_count = get_cpu_count() + corrected_cpu = get_corrected_cpu(cpu_count) + cpu_capacity = get_cpu_effective_capacity(corrected_cpu, is_control_node=True) + + # Here is what is different from health checks, + auto_max = max(mem_capacity, cpu_capacity) + + # add magic number of extra workers to ensure + # we have a few extra workers to run the heartbeat + auto_max += 7 + + return auto_max