Remove control and hazmat (squash this not done)

* moved status out and deleted control as no longer needed
* removed hazmat
This commit is contained in:
thedoubl3j
2026-01-12 10:06:51 -05:00
parent 5209bfcf82
commit e386326498
5 changed files with 22 additions and 235 deletions

View File

@@ -1,38 +0,0 @@
import logging
import uuid
import json
from django.db import connection
from awx.main.dispatch import get_task_queuename
from awx.main.utils.redis import get_redis_client
from . import pg_bus_conn
logger = logging.getLogger('awx.main.dispatch')
class Control(object):
services = ('dispatcher', 'callback_receiver')
result = None
def __init__(self, service, host=None):
if service not in self.services:
raise RuntimeError('{} must be in {}'.format(service, self.services))
self.service = service
self.queuename = host or get_task_queuename()
def status(self, *args, **kwargs):
r = get_redis_client()
if self.service == 'dispatcher':
stats = r.get(f'awx_{self.service}_statistics') or b''
return stats.decode('utf-8')
else:
workers = []
for key in r.keys('awx_callback_receiver_statistics_*'):
workers.append(r.get(key).decode('utf-8'))
return '\n'.join(workers)
@classmethod
def generate_reply_queue_name(cls):
return f"reply_to_{str(uuid.uuid4()).replace('-','_')}"

View File

@@ -18,10 +18,7 @@ from jinja2 import Template
import psutil
if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
else:
logger = logging.getLogger('awx.main.dispatch')
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class NoOpResultQueue(object):
@@ -58,142 +55,14 @@ class PoolWorker(object):
It is "idle" when self.managed_tasks is empty.
"""
track_managed_tasks = False
def __init__(self, queue_size, target, args, **kwargs):
self.messages_sent = 0
self.messages_finished = 0
self.managed_tasks = collections.OrderedDict()
self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue()
self.queue = MPQueue(queue_size)
self.process = Process(target=target, args=(self.queue, self.finished) + args)
def __init__(self, target, args, **kwargs):
self.process = Process(target=target, args=args)
self.process.daemon = True
self.creation_time = time.monotonic()
self.retiring = False
def start(self):
self.process.start()
def put(self, body):
if self.retiring:
uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A'
logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.")
raise QueueFull("Worker is retiring and not accepting new tasks")
uuid = '?'
if isinstance(body, dict):
if not body.get('uuid'):
body['uuid'] = str(uuid4())
uuid = body['uuid']
if self.track_managed_tasks:
self.managed_tasks[uuid] = body
self.queue.put(body, block=True, timeout=5)
self.messages_sent += 1
self.calculate_managed_tasks()
def quit(self):
"""
Send a special control message to the worker that tells it to exit
gracefully.
"""
self.queue.put('QUIT')
@property
def age(self):
"""Returns the current age of the worker in seconds."""
return time.monotonic() - self.creation_time
@property
def pid(self):
return self.process.pid
@property
def qsize(self):
return self.queue.qsize()
@property
def alive(self):
return self.process.is_alive()
@property
def mb(self):
if self.alive:
return '{:0.3f}'.format(psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0)
return '0'
@property
def exitcode(self):
return str(self.process.exitcode)
def calculate_managed_tasks(self):
if not self.track_managed_tasks:
return
# look to see if any tasks were finished
finished = []
for _ in range(self.finished.qsize()):
try:
finished.append(self.finished.get(block=False))
except QueueEmpty:
break # qsize is not always _totally_ up to date
# if any tasks were finished, removed them from the managed tasks for
# this worker
for uuid in finished:
try:
del self.managed_tasks[uuid]
self.messages_finished += 1
except KeyError:
# ansible _sometimes_ appears to send events w/ duplicate UUIDs;
# UUIDs for ansible events are *not* actually globally unique
# when this occurs, it's _fine_ to ignore this KeyError because
# the purpose of self.managed_tasks is to just track internal
# state of which events are *currently* being processed.
logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid))
@property
def current_task(self):
if not self.track_managed_tasks:
return None
self.calculate_managed_tasks()
# the task at [0] is the one that's running right now (or is about to
# be running)
if len(self.managed_tasks):
return self.managed_tasks[list(self.managed_tasks.keys())[0]]
return None
@property
def orphaned_tasks(self):
if not self.track_managed_tasks:
return []
orphaned = []
if not self.alive:
# if this process had a running task that never finished,
# requeue its error callbacks
current_task = self.current_task
if isinstance(current_task, dict):
orphaned.extend(current_task.get('errbacks', []))
# if this process has any pending messages requeue them
for _ in range(self.qsize):
try:
message = self.queue.get(block=False)
if message != 'QUIT':
orphaned.append(message)
except QueueEmpty:
break # qsize is not always _totally_ up to date
if len(orphaned):
logger.error('requeuing {} messages from gone worker pid:{}'.format(len(orphaned), self.pid))
return orphaned
@property
def busy(self):
self.calculate_managed_tasks()
return len(self.managed_tasks) > 0
@property
def idle(self):
return not self.busy
class WorkerPool(object):
"""
@@ -219,11 +88,10 @@ class WorkerPool(object):
pool_cls = PoolWorker
debug_meta = ''
def __init__(self, min_workers=None, queue_size=None):
def __init__(self, workers_num=None):
self.name = settings.CLUSTER_HOST_ID
self.pid = os.getpid()
self.min_workers = min_workers or settings.JOB_EVENT_WORKERS
self.queue_size = queue_size or settings.JOB_EVENT_MAX_QUEUE_SIZE
self.workers_num = workers_num or settings.JOB_EVENT_WORKERS
self.workers = []
def __len__(self):
@@ -232,7 +100,7 @@ class WorkerPool(object):
def init_workers(self, target, *target_args):
self.target = target
self.target_args = target_args
for idx in range(self.min_workers):
for idx in range(self.workers_num):
self.up()
def up(self):
@@ -242,7 +110,7 @@ class WorkerPool(object):
# for the DB and cache connections (that way lies race conditions)
django_connection.close()
django_cache.close()
worker = self.pool_cls(self.queue_size, self.target, (idx,) + self.target_args)
worker = self.pool_cls(self.target, (idx,) + self.target_args)
self.workers.append(worker)
try:
worker.start()
@@ -252,53 +120,6 @@ class WorkerPool(object):
logger.debug('scaling up worker pid:{}'.format(worker.pid))
return idx, worker
def debug(self, *args, **kwargs):
tmpl = Template(
'Recorded at: {{ dt }} \n'
'{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n'
'{% for w in workers %}'
'. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}'
' sent={{ w.messages_sent }}'
' age={{ "%.0f"|format(w.age) }}s'
' retiring={{ w.retiring }}'
'{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}'
' qsize={{ w.managed_tasks|length }}'
' rss={{ w.mb }}MB'
'{% for task in w.managed_tasks.values() %}'
'\n - {% if loop.index0 == 0 %}running {% if "age" in task %}for: {{ "%.1f" % task["age"] }}s {% endif %}{% else %}queued {% endif %}'
'{{ task["uuid"] }} '
'{% if "task" in task %}'
'{{ task["task"].rsplit(".", 1)[-1] }}'
# don't print kwargs, they often contain launch-time secrets
'(*{{ task.get("args", []) }})'
'{% endif %}'
'{% endfor %}'
'{% if not w.managed_tasks|length %}'
' [IDLE]'
'{% endif %}'
'\n'
'{% endfor %}'
)
now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta, dt=now)
def write(self, preferred_queue, body):
queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x == preferred_queue else x)
write_attempt_order = []
for queue_actual in queue_order:
try:
self.workers[queue_actual].put(body)
return queue_actual
except QueueFull:
pass
except Exception:
tb = traceback.format_exc()
logger.warning("could not write to queue %s" % preferred_queue)
logger.warning("detail: {}".format(tb))
write_attempt_order.append(preferred_queue)
logger.error("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
return None
def stop(self, signum):
try:
for worker in self.workers:

View File

@@ -79,10 +79,10 @@ class AWXConsumerRedis(AWXConsumerBase):
class BaseWorker(object):
def read(self, queue):
return queue.get(block=True, timeout=1)
def read(self):
raise NotImplemented()
def work_loop(self, queue, finished, idx, *args):
def work_loop(self, idx, *args):
ppid = os.getppid()
signal_handler = WorkerSignalHandler()
set_connection_name('worker') # set application_name to distinguish from other dispatcher processes
@@ -92,7 +92,7 @@ class BaseWorker(object):
if os.getppid() != ppid:
break
try:
body = self.read(queue)
body = self.read()
if body == 'QUIT':
break
except QueueEmpty:
@@ -108,10 +108,7 @@ class BaseWorker(object):
self.perform_work(body, *args)
except Exception:
logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}')
finally:
if 'uuid' in body:
uuid = body['uuid']
finished.put(uuid)
logger.debug('worker exiting gracefully pid:{}'.format(os.getpid()))
def perform_work(self, body):

View File

@@ -86,7 +86,7 @@ class CallbackBrokerWorker(BaseWorker):
"""This needs to be obtained after forking, or else it will give the parent process"""
return os.getpid()
def read(self, queue):
def read(self):
has_redis_error = False
try:
res = self.redis.blpop(self.queue_name, timeout=1)

View File

@@ -8,8 +8,8 @@ from django.core.management.base import BaseCommand, CommandError
import redis.exceptions
from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
from awx.main.dispatch.control import Control
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
from awx.main.utils.redis import get_redis_client
class Command(BaseCommand):
@@ -26,7 +26,7 @@ class Command(BaseCommand):
def handle(self, *arg, **options):
if options.get('status'):
print(Control('callback_receiver').status())
print(self.status())
return
consumer = None
@@ -46,3 +46,10 @@ class Command(BaseCommand):
print('Terminating Callback Receiver')
if consumer:
consumer.stop()
def status(self, *args, **kwargs):
r = get_redis_client()
workers = []
for key in r.keys('awx_callback_receiver_statistics_*'):
workers.append(r.get(key).decode('utf-8'))
return '\n'.join(workers)