diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index d20c6317ac..8c44231bf9 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -33,45 +33,7 @@ class Control(object): workers.append(r.get(key).decode('utf-8')) return '\n'.join(workers) - def running(self, *args, **kwargs): - return self.control_with_reply('running', *args, **kwargs) - - def cancel(self, task_ids, with_reply=True): - if with_reply: - return self.control_with_reply('cancel', extra_data={'task_ids': task_ids}) - else: - self.control({'control': 'cancel', 'task_ids': task_ids, 'reply_to': None}, extra_data={'task_ids': task_ids}) - - def schedule(self, *args, **kwargs): - return self.control_with_reply('schedule', *args, **kwargs) - @classmethod def generate_reply_queue_name(cls): return f"reply_to_{str(uuid.uuid4()).replace('-','_')}" - def control_with_reply(self, command, timeout=5, extra_data=None): - logger.warning('checking {} {} for {}'.format(self.service, command, self.queuename)) - reply_queue = Control.generate_reply_queue_name() - self.result = None - - if not connection.get_autocommit(): - raise RuntimeError('Control-with-reply messages can only be done in autocommit mode') - - with pg_bus_conn(select_timeout=timeout) as conn: - conn.listen(reply_queue) - send_data = {'control': command, 'reply_to': reply_queue} - if extra_data: - send_data.update(extra_data) - conn.notify(self.queuename, json.dumps(send_data)) - - for reply in conn.events(yield_timeouts=True): - if reply is None: - logger.error(f'{self.service} did not reply within {timeout}s') - raise RuntimeError(f"{self.service} did not reply within {timeout}s") - break - - return json.loads(reply.payload) - - def control(self, msg, **kwargs): - with pg_bus_conn() as conn: - conn.notify(self.queuename, json.dumps(msg)) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 802a5d5da6..b447b1e2bc 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -1,13 +1,10 @@ import logging import os -import random -import signal import sys import time import traceback from datetime import datetime, timezone from uuid import uuid4 -import json import collections from multiprocessing import Process @@ -17,19 +14,9 @@ from queue import Full as QueueFull, Empty as QueueEmpty from django.conf import settings from django.db import connection as django_connection, connections from django.core.cache import cache as django_cache -from django.utils.timezone import now as tz_now -from django_guid import set_guid from jinja2 import Template import psutil -from ansible_base.lib.logging.runtime import log_excess_runtime - -from awx.main.models import UnifiedJob -from awx.main.dispatch import reaper -from awx.main.utils.common import get_mem_effective_capacity, get_corrected_memory, get_corrected_cpu, get_cpu_effective_capacity - -# ansible-runner -from ansible_runner.utils.capacity import get_mem_in_bytes, get_cpu_count if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -37,8 +24,6 @@ else: logger = logging.getLogger('awx.main.dispatch') -RETIRED_SENTINEL_TASK = "[retired]" - class NoOpResultQueue(object): def put(self, item): @@ -94,7 +79,7 @@ class PoolWorker(object): if self.retiring: uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A' logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.") - raise QueueFull("Worker is retiring and not accepting new tasks") # AutoscalePool.write handles QueueFull + raise QueueFull("Worker is retiring and not accepting new tasks") uuid = '?' if isinstance(body, dict): if not body.get('uuid'): @@ -164,8 +149,6 @@ class PoolWorker(object): # the purpose of self.managed_tasks is to just track internal # state of which events are *currently* being processed. logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid)) - if self.retiring: - self.managed_tasks[RETIRED_SENTINEL_TASK] = {'task': RETIRED_SENTINEL_TASK} @property def current_task(self): @@ -213,10 +196,6 @@ class PoolWorker(object): return not self.busy -class StatefulPoolWorker(PoolWorker): - track_managed_tasks = True - - class WorkerPool(object): """ Creates a pool of forked PoolWorkers. @@ -328,256 +307,3 @@ class WorkerPool(object): except Exception: logger.exception('could not kill {}'.format(worker.pid)) - -def get_auto_max_workers(): - """Method we normally rely on to get max_workers - - Uses almost same logic as Instance.local_health_check - The important thing is to be MORE than Instance.capacity - so that the task-manager does not over-schedule this node - - Ideally we would just use the capacity from the database plus reserve workers, - but this poses some bootstrap problems where OCP task containers - register themselves after startup - """ - # Get memory from ansible-runner - total_memory_gb = get_mem_in_bytes() - - # This may replace memory calculation with a user override - corrected_memory = get_corrected_memory(total_memory_gb) - - # Get same number as max forks based on memory, this function takes memory as bytes - mem_capacity = get_mem_effective_capacity(corrected_memory, is_control_node=True) - - # Follow same process for CPU capacity constraint - cpu_count = get_cpu_count() - corrected_cpu = get_corrected_cpu(cpu_count) - cpu_capacity = get_cpu_effective_capacity(corrected_cpu, is_control_node=True) - - # Here is what is different from health checks, - auto_max = max(mem_capacity, cpu_capacity) - - # add magic number of extra workers to ensure - # we have a few extra workers to run the heartbeat - auto_max += 7 - - return auto_max - - -class AutoscalePool(WorkerPool): - """ - An extended pool implementation that automatically scales workers up and - down based on demand - """ - - pool_cls = StatefulPoolWorker - - def __init__(self, *args, **kwargs): - self.max_workers = kwargs.pop('max_workers', None) - self.max_worker_lifetime_seconds = kwargs.pop( - 'max_worker_lifetime_seconds', getattr(settings, 'WORKER_MAX_LIFETIME_SECONDS', 14400) - ) # Default to 4 hours - super(AutoscalePool, self).__init__(*args, **kwargs) - - if self.max_workers is None: - self.max_workers = get_auto_max_workers() - - # max workers can't be less than min_workers - self.max_workers = max(self.min_workers, self.max_workers) - - # the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own - # but if the task takes longer than the time defined here, we will force it to stop here - self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD - - # initialize some things for subsystem metrics periodic gathering - # the AutoscalePool class does not save these to redis directly, but reports via produce_subsystem_metrics - self.scale_up_ct = 0 - self.worker_count_max = 0 - - # last time we wrote current tasks, to avoid too much log spam - self.last_task_list_log = time.monotonic() - - def produce_subsystem_metrics(self, metrics_object): - metrics_object.set('dispatcher_pool_scale_up_events', self.scale_up_ct) - metrics_object.set('dispatcher_pool_active_task_count', sum(len(w.managed_tasks) for w in self.workers)) - metrics_object.set('dispatcher_pool_max_worker_count', self.worker_count_max) - self.worker_count_max = len(self.workers) - - @property - def should_grow(self): - if len(self.workers) < self.min_workers: - # If we don't have at least min_workers, add more - return True - # If every worker is busy doing something, add more - return all([w.busy for w in self.workers]) - - @property - def full(self): - return len(self.workers) == self.max_workers - - @property - def debug_meta(self): - return 'min={} max={}'.format(self.min_workers, self.max_workers) - - @log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2) - def cleanup(self): - """ - Perform some internal account and cleanup. This is run on - every cluster node heartbeat: - - 1. Discover worker processes that exited, and recover messages they - were handling. - 2. Clean up unnecessary, idle workers. - - IMPORTANT: this function is one of the few places in the dispatcher - (aside from setting lookups) where we talk to the database. As such, - if there's an outage, this method _can_ throw various - django.db.utils.Error exceptions. Act accordingly. - """ - orphaned = [] - for w in self.workers[::]: - is_retirement_age = self.max_worker_lifetime_seconds is not None and w.age > self.max_worker_lifetime_seconds - if not w.alive: - # the worker process has exited - # 1. take the task it was running and enqueue the error - # callbacks - # 2. take any pending tasks delivered to its queue and - # send them to another worker - logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode)) - if w.current_task: - if w.current_task == {'task': RETIRED_SENTINEL_TASK}: - logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age)) - self.workers.remove(w) - continue - if w.current_task != 'QUIT': - try: - for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): - reaper.reap_job(j, 'failed') - except Exception: - logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) - else: - logger.warning(f'Worker was told to quit but has not, pid={w.pid}') - orphaned.extend(w.orphaned_tasks) - self.workers.remove(w) - - elif w.idle and len(self.workers) > self.min_workers: - # the process has an empty queue (it's idle) and we have - # more processes in the pool than we need (> min) - # send this process a message so it will exit gracefully - # at the next opportunity - logger.debug('scaling down worker pid:{}'.format(w.pid)) - w.quit() - self.workers.remove(w) - - elif w.idle and is_retirement_age: - logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age)) - w.quit() - self.workers.remove(w) - - elif is_retirement_age and not w.retiring and not w.idle: - logger.info( - f"Worker pid:{w.pid} (age: {w.age:.0f}s) exceeded max lifetime ({self.max_worker_lifetime_seconds:.0f}s). " - "Signaling for graceful retirement." - ) - # Send QUIT signal; worker will finish current task then exit. - w.quit() - # mark as retiring to reject any future tasks that might be assigned in meantime - w.retiring = True - - if w.alive: - # if we discover a task manager invocation that's been running - # too long, reap it (because otherwise it'll just hold the postgres - # advisory lock forever); the goal of this code is to discover - # deadlocks or other serious issues in the task manager that cause - # the task manager to never do more work - current_task = w.current_task - if current_task and isinstance(current_task, dict): - endings = ('tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager') - current_task_name = current_task.get('task', '') - if current_task_name.endswith(endings): - if 'started' not in current_task: - w.managed_tasks[current_task['uuid']]['started'] = time.time() - age = time.time() - current_task['started'] - w.managed_tasks[current_task['uuid']]['age'] = age - if age > self.task_manager_timeout: - logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGUSR1 to {w.pid}') - os.kill(w.pid, signal.SIGUSR1) - - for m in orphaned: - # if all the workers are dead, spawn at least one - if not len(self.workers): - self.up() - idx = random.choice(range(len(self.workers))) - self.write(idx, m) - - def add_bind_kwargs(self, body): - bind_kwargs = body.pop('bind_kwargs', []) - body.setdefault('kwargs', {}) - if 'dispatch_time' in bind_kwargs: - body['kwargs']['dispatch_time'] = tz_now().isoformat() - if 'worker_tasks' in bind_kwargs: - worker_tasks = {} - for worker in self.workers: - worker.calculate_managed_tasks() - worker_tasks[worker.pid] = list(worker.managed_tasks.keys()) - body['kwargs']['worker_tasks'] = worker_tasks - - def up(self): - if self.full: - # if we can't spawn more workers, just toss this message into a - # random worker's backlog - idx = random.choice(range(len(self.workers))) - return idx, self.workers[idx] - else: - self.scale_up_ct += 1 - ret = super(AutoscalePool, self).up() - new_worker_ct = len(self.workers) - if new_worker_ct > self.worker_count_max: - self.worker_count_max = new_worker_ct - return ret - - @staticmethod - def fast_task_serialization(current_task): - try: - return str(current_task.get('task')) + ' - ' + str(sorted(current_task.get('args', []))) + ' - ' + str(sorted(current_task.get('kwargs', {}))) - except Exception: - # just make sure this does not make things worse - return str(current_task) - - def write(self, preferred_queue, body): - if 'guid' in body: - set_guid(body['guid']) - try: - if isinstance(body, dict) and body.get('bind_kwargs'): - self.add_bind_kwargs(body) - if self.should_grow: - self.up() - # we don't care about "preferred queue" round robin distribution, just - # find the first non-busy worker and claim it - workers = self.workers[:] - random.shuffle(workers) - for w in workers: - if not w.busy: - w.put(body) - break - else: - task_name = 'unknown' - if isinstance(body, dict): - task_name = body.get('task') - logger.warning(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') - # Once every 10 seconds write out task list for debugging - if time.monotonic() - self.last_task_list_log >= 10.0: - task_counts = {} - for worker in self.workers: - task_slug = self.fast_task_serialization(worker.current_task) - task_counts.setdefault(task_slug, 0) - task_counts[task_slug] += 1 - logger.info(f'Running tasks by count:\n{json.dumps(task_counts, indent=2)}') - self.last_task_list_log = time.monotonic() - return super(AutoscalePool, self).write(preferred_queue, body) - except Exception: - for conn in connections.all(): - # If the database connection has a hiccup, re-establish a new - # connection - conn.close_if_unusable_or_obsolete() - logger.exception('failed to write inbound message') diff --git a/awx/main/dispatch/worker/__init__.py b/awx/main/dispatch/worker/__init__.py index 6fe8f64608..3d04184ef5 100644 --- a/awx/main/dispatch/worker/__init__.py +++ b/awx/main/dispatch/worker/__init__.py @@ -1,3 +1,3 @@ -from .base import AWXConsumerRedis, AWXConsumerPG, BaseWorker # noqa +from .base import AWXConsumerRedis, BaseWorker # noqa from .callback import CallbackBrokerWorker # noqa from .task import TaskWorker # noqa diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index db89fc923e..481df72393 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -6,25 +6,17 @@ import logging import signal import sys import redis -import json -import psycopg import time -from uuid import UUID from queue import Empty as QueueEmpty -from datetime import timedelta from django import db from django.conf import settings import redis.exceptions -from ansible_base.lib.logging.runtime import log_excess_runtime from awx.main.utils.redis import get_redis_client from awx.main.dispatch.pool import WorkerPool -from awx.main.dispatch.periodic import Scheduler -from awx.main.dispatch import pg_bus_conn from awx.main.utils.db import set_connection_name -import awx.main.analytics.subsystem_metrics as s_metrics if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -62,85 +54,6 @@ class AWXConsumerBase(object): self.pool.init_workers(self.worker.work_loop) self.redis = get_redis_client() - @property - def listening_on(self): - return f'listening on {self.queues}' - - def control(self, body): - logger.warning(f'Received control signal:\n{body}') - control = body.get('control') - if control in ('status', 'schedule', 'running', 'cancel'): - reply_queue = body['reply_to'] - if control == 'status': - msg = '\n'.join([self.listening_on, self.pool.debug()]) - if control == 'schedule': - msg = self.scheduler.debug() - elif control == 'running': - msg = [] - for worker in self.pool.workers: - worker.calculate_managed_tasks() - msg.extend(worker.managed_tasks.keys()) - elif control == 'cancel': - msg = [] - task_ids = set(body['task_ids']) - for worker in self.pool.workers: - task = worker.current_task - if task and task['uuid'] in task_ids: - logger.warn(f'Sending SIGTERM to task id={task["uuid"]}, task={task.get("task")}, args={task.get("args")}') - os.kill(worker.pid, signal.SIGTERM) - msg.append(task['uuid']) - if task_ids and not msg: - logger.info(f'Could not locate running tasks to cancel with ids={task_ids}') - - if reply_queue is not None: - with pg_bus_conn() as conn: - conn.notify(reply_queue, json.dumps(msg)) - elif control == 'reload': - for worker in self.pool.workers: - worker.quit() - else: - logger.error('unrecognized control message: {}'.format(control)) - - def dispatch_task(self, body): - """This will place the given body into a worker queue to run method decorated as a task""" - if isinstance(body, dict): - body['time_ack'] = time.time() - - if len(self.pool): - if "uuid" in body and body['uuid']: - try: - queue = UUID(body['uuid']).int % len(self.pool) - except Exception: - queue = self.total_messages % len(self.pool) - else: - queue = self.total_messages % len(self.pool) - else: - queue = 0 - self.pool.write(queue, body) - self.total_messages += 1 - - def process_task(self, body): - """Routes the task details in body as either a control task or a task-task""" - if 'control' in body: - try: - return self.control(body) - except Exception: - logger.exception(f"Exception handling control message: {body}") - return - self.dispatch_task(body) - - @log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2) - def record_statistics(self): - if time.time() - self.last_stats > 1: # buffer stat recording to once per second - save_data = self.pool.debug() - try: - self.redis.set(f'awx_{self.name}_statistics', save_data) - except redis.exceptions.ConnectionError as exc: - logger.warning(f'Redis connection error saving {self.name} status data:\n{exc}\nmissed data:\n{save_data}') - except Exception: - logger.exception(f"Unknown redis error saving {self.name} status data:\nmissed data:\n{save_data}") - self.last_stats = time.time() - def run(self, *args, **kwargs): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) @@ -165,140 +78,6 @@ class AWXConsumerRedis(AWXConsumerBase): time.sleep(60) -class AWXConsumerPG(AWXConsumerBase): - def __init__(self, *args, schedule=None, **kwargs): - super().__init__(*args, **kwargs) - self.pg_max_wait = getattr(settings, 'DISPATCHER_DB_DOWNTOWN_TOLLERANCE', settings.DISPATCHER_DB_DOWNTIME_TOLERANCE) - # if no successful loops have ran since startup, then we should fail right away - self.pg_is_down = True # set so that we fail if we get database errors on startup - init_time = time.time() - self.pg_down_time = init_time - self.pg_max_wait # allow no grace period - self.last_cleanup = init_time - self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False) - self.last_metrics_gather = init_time - self.listen_cumulative_time = 0.0 - if schedule: - schedule = schedule.copy() - else: - schedule = {} - # add control tasks to be ran at regular schedules - # NOTE: if we run out of database connections, it is important to still run cleanup - # so that we scale down workers and free up connections - schedule['pool_cleanup'] = {'control': self.pool.cleanup, 'schedule': timedelta(seconds=60)} - # record subsystem metrics for the dispatcher - schedule['metrics_gather'] = {'control': self.record_metrics, 'schedule': timedelta(seconds=20)} - self.scheduler = Scheduler(schedule) - - @log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2) - def record_metrics(self): - current_time = time.time() - self.pool.produce_subsystem_metrics(self.subsystem_metrics) - self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather)) - try: - self.subsystem_metrics.pipe_execute() - except redis.exceptions.ConnectionError as exc: - logger.warning(f'Redis connection error saving dispatcher metrics, error:\n{exc}') - self.listen_cumulative_time = 0.0 - self.last_metrics_gather = current_time - - def run_periodic_tasks(self): - """ - Run general periodic logic, and return maximum time in seconds before - the next requested run - This may be called more often than that when events are consumed - so this should be very efficient in that - """ - try: - self.record_statistics() # maintains time buffer in method - except Exception as exc: - logger.warning(f'Failed to save dispatcher statistics {exc}') - - # Everything benchmarks to the same original time, so that skews due to - # runtime of the actions, themselves, do not mess up scheduling expectations - reftime = time.time() - - for job in self.scheduler.get_and_mark_pending(reftime=reftime): - if 'control' in job.data: - try: - job.data['control']() - except Exception: - logger.exception(f'Error running control task {job.data}') - elif 'task' in job.data: - body = self.worker.resolve_callable(job.data['task']).get_async_body() - # bypasses pg_notify for scheduled tasks - self.dispatch_task(body) - - if self.pg_is_down: - logger.info('Dispatcher listener connection established') - self.pg_is_down = False - - self.listen_start = time.time() - - return self.scheduler.time_until_next_run(reftime=reftime) - - def run(self, *args, **kwargs): - super(AWXConsumerPG, self).run(*args, **kwargs) - - logger.info(f"Running {self.name}, workers min={self.pool.min_workers} max={self.pool.max_workers}, listening to queues {self.queues}") - init = False - - while True: - try: - with pg_bus_conn(new_connection=True) as conn: - for queue in self.queues: - conn.listen(queue) - if init is False: - self.worker.on_start() - init = True - # run_periodic_tasks run scheduled actions and gives time until next scheduled action - # this is saved to the conn (PubSub) object in order to modify read timeout in-loop - conn.select_timeout = self.run_periodic_tasks() - # this is the main operational loop for awx-manage run_dispatcher - for e in conn.events(yield_timeouts=True): - self.listen_cumulative_time += time.time() - self.listen_start # for metrics - if e is not None: - self.process_task(json.loads(e.payload)) - conn.select_timeout = self.run_periodic_tasks() - if self.should_stop: - return - except psycopg.InterfaceError: - logger.warning("Stale Postgres message bus connection, reconnecting") - continue - except (db.DatabaseError, psycopg.OperationalError): - # If we have attained stady state operation, tolerate short-term database hickups - if not self.pg_is_down: - logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s") - self.pg_down_time = time.time() - self.pg_is_down = True - current_downtime = time.time() - self.pg_down_time - if current_downtime > self.pg_max_wait: - logger.exception(f"Postgres event consumer has not recovered in {current_downtime} s, exiting") - # Sending QUIT to multiprocess queue to signal workers to exit - for worker in self.pool.workers: - try: - worker.quit() - except Exception: - logger.exception(f"Error sending QUIT to worker {worker}") - raise - # Wait for a second before next attempt, but still listen for any shutdown signals - for i in range(10): - if self.should_stop: - return - time.sleep(0.1) - for conn in db.connections.all(): - conn.close_if_unusable_or_obsolete() - except Exception: - # Log unanticipated exception in addition to writing to stderr to get timestamps and other metadata - logger.exception('Encountered unhandled error in dispatcher main loop') - # Sending QUIT to multiprocess queue to signal workers to exit - for worker in self.pool.workers: - try: - worker.quit() - except Exception: - logger.exception(f"Error sending QUIT to worker {worker}") - raise - - class BaseWorker(object): def read(self, queue): return queue.get(block=True, timeout=1) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index bfffee49f9..d9a0dd6c2f 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -15,6 +15,9 @@ import subprocess import tempfile from collections import OrderedDict +# Dispatcher +from dispatcherd.factories import get_control_from_settings + # Django from django.conf import settings from django.db import models, connection, transaction @@ -1499,7 +1502,6 @@ class UnifiedJob( # Special case for task manager (used during workflow job cancellation) if not connection.get_autocommit(): try: - from dispatcherd.factories import get_control_from_settings ctl = get_control_from_settings() ctl.control('cancel', data={'uuid': self.celery_task_id}) @@ -1510,7 +1512,6 @@ class UnifiedJob( # Standard case with reply try: timeout = 5 - from dispatcherd.factories import get_control_from_settings ctl = get_control_from_settings() results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index a7cade0527..6c89f57258 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -622,40 +622,8 @@ def inspect_execution_and_hop_nodes(instance_list): execution_node_health_check.apply_async([hostname]) -@task(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks']) -def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): - """ - Original implementation for AWX dispatcher. - Uses worker_tasks from bind_kwargs to track running tasks. - """ - # Run common instance management logic - this_inst, instance_list, lost_instances = _heartbeat_instance_management() - if this_inst is None: - return # Early return case from instance management - - # Check versions - _heartbeat_check_versions(this_inst, instance_list) - - # Handle lost instances - _heartbeat_handle_lost_instances(lost_instances, this_inst) - - # Run local reaper - original implementation using worker_tasks - if worker_tasks is not None: - active_task_ids = [] - for task_list in worker_tasks.values(): - active_task_ids.extend(task_list) - - # Convert dispatch_time to datetime - ref_time = datetime.fromisoformat(dispatch_time) if dispatch_time else now() - - reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time) - - if max(len(task_list) for task_list in worker_tasks.values()) <= 1: - reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time) - - @task(queue=get_task_queuename, bind=True) -def adispatch_cluster_node_heartbeat(binder): +def cluster_node_heartbeat(binder): """ Dispatcherd implementation. Uses Control API to get running tasks. diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index 2c3a665ad9..44fad8b733 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -5,7 +5,7 @@ import pytest from awx.main.models import Job, WorkflowJob, Instance from awx.main.dispatch import reaper -from awx.main.dispatch.publish import task +from dispatcherd.publish import task ''' Prevent logger. calls from triggering database operations diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1f4a472bc9..04490866f7 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -454,7 +454,7 @@ for options in CELERYBEAT_SCHEDULE.values(): task_name = options['task'] # Handle the only one exception case of the heartbeat which has a new implementation if task_name == 'awx.main.tasks.system.cluster_node_heartbeat': - task_name = 'awx.main.tasks.system.adispatch_cluster_node_heartbeat' + task_name = 'awx.main.tasks.system.cluster_node_heartbeat' new_options['task'] = task_name new_options['schedule'] = options['schedule'].total_seconds() DISPATCHER_SCHEDULE[task_name] = new_options