From e3863264986883cd1dbafced3d4770f8c344417c Mon Sep 17 00:00:00 2001 From: thedoubl3j Date: Mon, 12 Jan 2026 10:06:51 -0500 Subject: [PATCH] Remove control and hazmat (squash this not done) * moved status out and deleted control as no longer needed * removed hazmat --- awx/main/dispatch/control.py | 38 ---- awx/main/dispatch/pool.py | 193 +----------------- awx/main/dispatch/worker/base.py | 13 +- awx/main/dispatch/worker/callback.py | 2 +- .../commands/run_callback_receiver.py | 11 +- 5 files changed, 22 insertions(+), 235 deletions(-) delete mode 100644 awx/main/dispatch/control.py diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py deleted file mode 100644 index 976d7ae44f..0000000000 --- a/awx/main/dispatch/control.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -import uuid -import json - -from django.db import connection - -from awx.main.dispatch import get_task_queuename -from awx.main.utils.redis import get_redis_client - -from . import pg_bus_conn - -logger = logging.getLogger('awx.main.dispatch') - - -class Control(object): - services = ('dispatcher', 'callback_receiver') - result = None - - def __init__(self, service, host=None): - if service not in self.services: - raise RuntimeError('{} must be in {}'.format(service, self.services)) - self.service = service - self.queuename = host or get_task_queuename() - - def status(self, *args, **kwargs): - r = get_redis_client() - if self.service == 'dispatcher': - stats = r.get(f'awx_{self.service}_statistics') or b'' - return stats.decode('utf-8') - else: - workers = [] - for key in r.keys('awx_callback_receiver_statistics_*'): - workers.append(r.get(key).decode('utf-8')) - return '\n'.join(workers) - - @classmethod - def generate_reply_queue_name(cls): - return f"reply_to_{str(uuid.uuid4()).replace('-','_')}" diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index a7162e9c40..e660751e16 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -18,10 +18,7 @@ from jinja2 import Template import psutil -if 'run_callback_receiver' in sys.argv: - logger = logging.getLogger('awx.main.commands.run_callback_receiver') -else: - logger = logging.getLogger('awx.main.dispatch') +logger = logging.getLogger('awx.main.commands.run_callback_receiver') class NoOpResultQueue(object): @@ -58,142 +55,14 @@ class PoolWorker(object): It is "idle" when self.managed_tasks is empty. """ - track_managed_tasks = False - - def __init__(self, queue_size, target, args, **kwargs): - self.messages_sent = 0 - self.messages_finished = 0 - self.managed_tasks = collections.OrderedDict() - self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue() - self.queue = MPQueue(queue_size) - self.process = Process(target=target, args=(self.queue, self.finished) + args) + def __init__(self, target, args, **kwargs): + self.process = Process(target=target, args=args) self.process.daemon = True self.creation_time = time.monotonic() - self.retiring = False def start(self): self.process.start() - def put(self, body): - if self.retiring: - uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A' - logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.") - raise QueueFull("Worker is retiring and not accepting new tasks") - uuid = '?' - if isinstance(body, dict): - if not body.get('uuid'): - body['uuid'] = str(uuid4()) - uuid = body['uuid'] - if self.track_managed_tasks: - self.managed_tasks[uuid] = body - self.queue.put(body, block=True, timeout=5) - self.messages_sent += 1 - self.calculate_managed_tasks() - - def quit(self): - """ - Send a special control message to the worker that tells it to exit - gracefully. - """ - self.queue.put('QUIT') - - @property - def age(self): - """Returns the current age of the worker in seconds.""" - return time.monotonic() - self.creation_time - - @property - def pid(self): - return self.process.pid - - @property - def qsize(self): - return self.queue.qsize() - - @property - def alive(self): - return self.process.is_alive() - - @property - def mb(self): - if self.alive: - return '{:0.3f}'.format(psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0) - return '0' - - @property - def exitcode(self): - return str(self.process.exitcode) - - def calculate_managed_tasks(self): - if not self.track_managed_tasks: - return - # look to see if any tasks were finished - finished = [] - for _ in range(self.finished.qsize()): - try: - finished.append(self.finished.get(block=False)) - except QueueEmpty: - break # qsize is not always _totally_ up to date - - # if any tasks were finished, removed them from the managed tasks for - # this worker - for uuid in finished: - try: - del self.managed_tasks[uuid] - self.messages_finished += 1 - except KeyError: - # ansible _sometimes_ appears to send events w/ duplicate UUIDs; - # UUIDs for ansible events are *not* actually globally unique - # when this occurs, it's _fine_ to ignore this KeyError because - # the purpose of self.managed_tasks is to just track internal - # state of which events are *currently* being processed. - logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid)) - - @property - def current_task(self): - if not self.track_managed_tasks: - return None - self.calculate_managed_tasks() - # the task at [0] is the one that's running right now (or is about to - # be running) - if len(self.managed_tasks): - return self.managed_tasks[list(self.managed_tasks.keys())[0]] - - return None - - @property - def orphaned_tasks(self): - if not self.track_managed_tasks: - return [] - orphaned = [] - if not self.alive: - # if this process had a running task that never finished, - # requeue its error callbacks - current_task = self.current_task - if isinstance(current_task, dict): - orphaned.extend(current_task.get('errbacks', [])) - - # if this process has any pending messages requeue them - for _ in range(self.qsize): - try: - message = self.queue.get(block=False) - if message != 'QUIT': - orphaned.append(message) - except QueueEmpty: - break # qsize is not always _totally_ up to date - if len(orphaned): - logger.error('requeuing {} messages from gone worker pid:{}'.format(len(orphaned), self.pid)) - return orphaned - - @property - def busy(self): - self.calculate_managed_tasks() - return len(self.managed_tasks) > 0 - - @property - def idle(self): - return not self.busy - class WorkerPool(object): """ @@ -219,11 +88,10 @@ class WorkerPool(object): pool_cls = PoolWorker debug_meta = '' - def __init__(self, min_workers=None, queue_size=None): + def __init__(self, workers_num=None): self.name = settings.CLUSTER_HOST_ID self.pid = os.getpid() - self.min_workers = min_workers or settings.JOB_EVENT_WORKERS - self.queue_size = queue_size or settings.JOB_EVENT_MAX_QUEUE_SIZE + self.workers_num = workers_num or settings.JOB_EVENT_WORKERS self.workers = [] def __len__(self): @@ -232,7 +100,7 @@ class WorkerPool(object): def init_workers(self, target, *target_args): self.target = target self.target_args = target_args - for idx in range(self.min_workers): + for idx in range(self.workers_num): self.up() def up(self): @@ -242,7 +110,7 @@ class WorkerPool(object): # for the DB and cache connections (that way lies race conditions) django_connection.close() django_cache.close() - worker = self.pool_cls(self.queue_size, self.target, (idx,) + self.target_args) + worker = self.pool_cls(self.target, (idx,) + self.target_args) self.workers.append(worker) try: worker.start() @@ -252,53 +120,6 @@ class WorkerPool(object): logger.debug('scaling up worker pid:{}'.format(worker.pid)) return idx, worker - def debug(self, *args, **kwargs): - tmpl = Template( - 'Recorded at: {{ dt }} \n' - '{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n' - '{% for w in workers %}' - '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' - ' sent={{ w.messages_sent }}' - ' age={{ "%.0f"|format(w.age) }}s' - ' retiring={{ w.retiring }}' - '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}' - ' qsize={{ w.managed_tasks|length }}' - ' rss={{ w.mb }}MB' - '{% for task in w.managed_tasks.values() %}' - '\n - {% if loop.index0 == 0 %}running {% if "age" in task %}for: {{ "%.1f" % task["age"] }}s {% endif %}{% else %}queued {% endif %}' - '{{ task["uuid"] }} ' - '{% if "task" in task %}' - '{{ task["task"].rsplit(".", 1)[-1] }}' - # don't print kwargs, they often contain launch-time secrets - '(*{{ task.get("args", []) }})' - '{% endif %}' - '{% endfor %}' - '{% if not w.managed_tasks|length %}' - ' [IDLE]' - '{% endif %}' - '\n' - '{% endfor %}' - ) - now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') - return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta, dt=now) - - def write(self, preferred_queue, body): - queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x == preferred_queue else x) - write_attempt_order = [] - for queue_actual in queue_order: - try: - self.workers[queue_actual].put(body) - return queue_actual - except QueueFull: - pass - except Exception: - tb = traceback.format_exc() - logger.warning("could not write to queue %s" % preferred_queue) - logger.warning("detail: {}".format(tb)) - write_attempt_order.append(preferred_queue) - logger.error("could not write payload to any queue, attempted order: {}".format(write_attempt_order)) - return None - def stop(self, signum): try: for worker in self.workers: diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 481df72393..f3f9f14cf0 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -79,10 +79,10 @@ class AWXConsumerRedis(AWXConsumerBase): class BaseWorker(object): - def read(self, queue): - return queue.get(block=True, timeout=1) + def read(self): + raise NotImplemented() - def work_loop(self, queue, finished, idx, *args): + def work_loop(self, idx, *args): ppid = os.getppid() signal_handler = WorkerSignalHandler() set_connection_name('worker') # set application_name to distinguish from other dispatcher processes @@ -92,7 +92,7 @@ class BaseWorker(object): if os.getppid() != ppid: break try: - body = self.read(queue) + body = self.read() if body == 'QUIT': break except QueueEmpty: @@ -108,10 +108,7 @@ class BaseWorker(object): self.perform_work(body, *args) except Exception: logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}') - finally: - if 'uuid' in body: - uuid = body['uuid'] - finished.put(uuid) + logger.debug('worker exiting gracefully pid:{}'.format(os.getpid())) def perform_work(self, body): diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 60f01e380e..503d978cac 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -86,7 +86,7 @@ class CallbackBrokerWorker(BaseWorker): """This needs to be obtained after forking, or else it will give the parent process""" return os.getpid() - def read(self, queue): + def read(self): has_redis_error = False try: res = self.redis.blpop(self.queue_name, timeout=1) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 8f67909dad..c450d6dc72 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -8,8 +8,8 @@ from django.core.management.base import BaseCommand, CommandError import redis.exceptions from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer -from awx.main.dispatch.control import Control from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker +from awx.main.utils.redis import get_redis_client class Command(BaseCommand): @@ -26,7 +26,7 @@ class Command(BaseCommand): def handle(self, *arg, **options): if options.get('status'): - print(Control('callback_receiver').status()) + print(self.status()) return consumer = None @@ -46,3 +46,10 @@ class Command(BaseCommand): print('Terminating Callback Receiver') if consumer: consumer.stop() + + def status(self, *args, **kwargs): + r = get_redis_client() + workers = [] + for key in r.keys('awx_callback_receiver_statistics_*'): + workers.append(r.get(key).decode('utf-8')) + return '\n'.join(workers)