diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 6b3c13499d..c7a623c8d9 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -2,6 +2,9 @@ import logging import uuid import json +from django.conf import settings +import redis + from awx.main.dispatch import get_local_queuename from . import pg_bus_conn @@ -21,7 +24,9 @@ class Control(object): self.queuename = host or get_local_queuename() def status(self, *args, **kwargs): - return self.control_with_reply('status', *args, **kwargs) + r = redis.Redis.from_url(settings.BROKER_URL) + stats = r.get(f'awx_{self.service}_statistics') or b'' + return stats.decode('utf-8') def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index f5b38262ad..dc97402788 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -5,6 +5,7 @@ import signal import sys import time import traceback +from datetime import datetime from uuid import uuid4 import collections @@ -27,6 +28,12 @@ else: logger = logging.getLogger('awx.main.dispatch') +class NoOpResultQueue(object): + + def put(self, item): + pass + + class PoolWorker(object): ''' Used to track a worker child process and its pending and finished messages. @@ -56,11 +63,13 @@ class PoolWorker(object): It is "idle" when self.managed_tasks is empty. ''' - def __init__(self, queue_size, target, args): + track_managed_tasks = False + + def __init__(self, queue_size, target, args, **kwargs): self.messages_sent = 0 self.messages_finished = 0 self.managed_tasks = collections.OrderedDict() - self.finished = MPQueue(queue_size) + self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue() self.queue = MPQueue(queue_size) self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process.daemon = True @@ -74,7 +83,8 @@ class PoolWorker(object): if not body.get('uuid'): body['uuid'] = str(uuid4()) uuid = body['uuid'] - self.managed_tasks[uuid] = body + if self.track_managed_tasks: + self.managed_tasks[uuid] = body self.queue.put(body, block=True, timeout=5) self.messages_sent += 1 self.calculate_managed_tasks() @@ -111,6 +121,8 @@ class PoolWorker(object): return str(self.process.exitcode) def calculate_managed_tasks(self): + if not self.track_managed_tasks: + return # look to see if any tasks were finished finished = [] for _ in range(self.finished.qsize()): @@ -135,6 +147,8 @@ class PoolWorker(object): @property def current_task(self): + if not self.track_managed_tasks: + return None self.calculate_managed_tasks() # the task at [0] is the one that's running right now (or is about to # be running) @@ -145,6 +159,8 @@ class PoolWorker(object): @property def orphaned_tasks(self): + if not self.track_managed_tasks: + return [] orphaned = [] if not self.alive: # if this process had a running task that never finished, @@ -179,6 +195,11 @@ class PoolWorker(object): return not self.busy +class StatefulPoolWorker(PoolWorker): + + track_managed_tasks = True + + class WorkerPool(object): ''' Creates a pool of forked PoolWorkers. @@ -200,6 +221,7 @@ class WorkerPool(object): ) ''' + pool_cls = PoolWorker debug_meta = '' def __init__(self, min_workers=None, queue_size=None): @@ -225,7 +247,7 @@ class WorkerPool(object): # for the DB and cache connections (that way lies race conditions) django_connection.close() django_cache.close() - worker = PoolWorker(self.queue_size, self.target, (idx,) + self.target_args) + worker = self.pool_cls(self.queue_size, self.target, (idx,) + self.target_args) self.workers.append(worker) try: worker.start() @@ -236,13 +258,13 @@ class WorkerPool(object): return idx, worker def debug(self, *args, **kwargs): - self.cleanup() tmpl = Template( + 'Recorded at: {{ dt }} \n' '{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n' '{% for w in workers %}' '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' ' sent={{ w.messages_sent }}' - ' finished={{ w.messages_finished }}' + '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}' ' qsize={{ w.managed_tasks|length }}' ' rss={{ w.mb }}MB' '{% for task in w.managed_tasks.values() %}' @@ -260,7 +282,11 @@ class WorkerPool(object): '\n' '{% endfor %}' ) - return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta) + now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC') + return tmpl.render( + pool=self, workers=self.workers, meta=self.debug_meta, + dt=now + ) def write(self, preferred_queue, body): queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x==preferred_queue else x) @@ -293,6 +319,8 @@ class AutoscalePool(WorkerPool): down based on demand ''' + pool_cls = StatefulPoolWorker + def __init__(self, *args, **kwargs): self.max_workers = kwargs.pop('max_workers', None) super(AutoscalePool, self).__init__(*args, **kwargs) @@ -309,6 +337,10 @@ class AutoscalePool(WorkerPool): # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) + def debug(self, *args, **kwargs): + self.cleanup() + return super(AutoscalePool, self).debug(*args, **kwargs) + @property def should_grow(self): if len(self.workers) < self.min_workers: diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 7001cd9bb9..c5596112b8 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -43,6 +43,9 @@ class WorkerSignalHandler: class AWXConsumerBase(object): + + last_stats = time.time() + def __init__(self, name, worker, queues=[], pool=None): self.should_stop = False @@ -54,6 +57,7 @@ class AWXConsumerBase(object): if pool is None: self.pool = WorkerPool() self.pool.init_workers(self.worker.work_loop) + self.redis = redis.Redis.from_url(settings.BROKER_URL) @property def listening_on(self): @@ -99,6 +103,16 @@ class AWXConsumerBase(object): queue = 0 self.pool.write(queue, body) self.total_messages += 1 + self.record_statistics() + + def record_statistics(self): + if time.time() - self.last_stats > 1: # buffer stat recording to once per second + try: + self.redis.set(f'awx_{self.name}_statistics', self.pool.debug()) + self.last_stats = time.time() + except Exception: + logger.exception(f"encountered an error communicating with redis to store {self.name} statistics") + self.last_stats = time.time() def run(self, *args, **kwargs): signal.signal(signal.SIGINT, self.stop) @@ -120,10 +134,9 @@ class AWXConsumerRedis(AWXConsumerBase): time_to_sleep = 1 while True: - queue = redis.Redis.from_url(settings.BROKER_URL) while True: try: - res = queue.blpop(self.queues) + res = self.redis.blpop(self.queues) time_to_sleep = 1 res = json.loads(res[1]) self.process_task(res) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 7e28330067..23922a7537 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -4,6 +4,7 @@ from django.conf import settings from django.core.management.base import BaseCommand +from awx.main.dispatch.control import Control from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker @@ -15,7 +16,14 @@ class Command(BaseCommand): ''' help = 'Launch the job callback receiver' + def add_arguments(self, parser): + parser.add_argument('--status', dest='status', action='store_true', + help='print the internal state of any running dispatchers') + def handle(self, *arg, **options): + if options.get('status'): + print(Control('callback_receiver').status()) + return consumer = None try: consumer = AWXConsumerRedis( diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index caf54f0161..e92867d6a5 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -10,7 +10,7 @@ import pytest from awx.main.models import Job, WorkflowJob, Instance from awx.main.dispatch import reaper -from awx.main.dispatch.pool import PoolWorker, WorkerPool, AutoscalePool +from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool from awx.main.dispatch.publish import task from awx.main.dispatch.worker import BaseWorker, TaskWorker @@ -80,7 +80,7 @@ class SlowResultWriter(BaseWorker): class TestPoolWorker: def setup_method(self, test_method): - self.worker = PoolWorker(1000, self.tick, tuple()) + self.worker = StatefulPoolWorker(1000, self.tick, tuple()) def tick(self): self.worker.finished.put(self.worker.queue.get()['uuid'])