diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 6b3c13499d..c7a623c8d9 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -2,6 +2,9 @@ import logging import uuid import json +from django.conf import settings +import redis + from awx.main.dispatch import get_local_queuename from . import pg_bus_conn @@ -21,7 +24,9 @@ class Control(object): self.queuename = host or get_local_queuename() def status(self, *args, **kwargs): - return self.control_with_reply('status', *args, **kwargs) + r = redis.Redis.from_url(settings.BROKER_URL) + stats = r.get(f'awx_{self.service}_statistics') or b'' + return stats.decode('utf-8') def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index c37cd67763..dc97402788 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -5,6 +5,7 @@ import signal import sys import time import traceback +from datetime import datetime from uuid import uuid4 import collections @@ -257,13 +258,13 @@ class WorkerPool(object): return idx, worker def debug(self, *args, **kwargs): - self.cleanup() tmpl = Template( + 'Recorded at: {{ dt }} \n' '{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n' '{% for w in workers %}' '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' ' sent={{ w.messages_sent }}' - ' finished={{ w.messages_finished }}' + '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}' ' qsize={{ w.managed_tasks|length }}' ' rss={{ w.mb }}MB' '{% for task in w.managed_tasks.values() %}' @@ -281,7 +282,11 @@ class WorkerPool(object): '\n' '{% endfor %}' ) - return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta) + now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC') + return tmpl.render( + pool=self, workers=self.workers, meta=self.debug_meta, + dt=now + ) def write(self, preferred_queue, body): queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x==preferred_queue else x) @@ -332,6 +337,10 @@ class AutoscalePool(WorkerPool): # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) + def debug(self, *args, **kwargs): + self.cleanup() + return super(AutoscalePool, self).debug(*args, **kwargs) + @property def should_grow(self): if len(self.workers) < self.min_workers: diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 7001cd9bb9..c5596112b8 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -43,6 +43,9 @@ class WorkerSignalHandler: class AWXConsumerBase(object): + + last_stats = time.time() + def __init__(self, name, worker, queues=[], pool=None): self.should_stop = False @@ -54,6 +57,7 @@ class AWXConsumerBase(object): if pool is None: self.pool = WorkerPool() self.pool.init_workers(self.worker.work_loop) + self.redis = redis.Redis.from_url(settings.BROKER_URL) @property def listening_on(self): @@ -99,6 +103,16 @@ class AWXConsumerBase(object): queue = 0 self.pool.write(queue, body) self.total_messages += 1 + self.record_statistics() + + def record_statistics(self): + if time.time() - self.last_stats > 1: # buffer stat recording to once per second + try: + self.redis.set(f'awx_{self.name}_statistics', self.pool.debug()) + self.last_stats = time.time() + except Exception: + logger.exception(f"encountered an error communicating with redis to store {self.name} statistics") + self.last_stats = time.time() def run(self, *args, **kwargs): signal.signal(signal.SIGINT, self.stop) @@ -120,10 +134,9 @@ class AWXConsumerRedis(AWXConsumerBase): time_to_sleep = 1 while True: - queue = redis.Redis.from_url(settings.BROKER_URL) while True: try: - res = queue.blpop(self.queues) + res = self.redis.blpop(self.queues) time_to_sleep = 1 res = json.loads(res[1]) self.process_task(res) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 7e28330067..23922a7537 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -4,6 +4,7 @@ from django.conf import settings from django.core.management.base import BaseCommand +from awx.main.dispatch.control import Control from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker @@ -15,7 +16,14 @@ class Command(BaseCommand): ''' help = 'Launch the job callback receiver' + def add_arguments(self, parser): + parser.add_argument('--status', dest='status', action='store_true', + help='print the internal state of any running dispatchers') + def handle(self, *arg, **options): + if options.get('status'): + print(Control('callback_receiver').status()) + return consumer = None try: consumer = AWXConsumerRedis( diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index caf54f0161..e92867d6a5 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -10,7 +10,7 @@ import pytest from awx.main.models import Job, WorkflowJob, Instance from awx.main.dispatch import reaper -from awx.main.dispatch.pool import PoolWorker, WorkerPool, AutoscalePool +from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool from awx.main.dispatch.publish import task from awx.main.dispatch.worker import BaseWorker, TaskWorker @@ -80,7 +80,7 @@ class SlowResultWriter(BaseWorker): class TestPoolWorker: def setup_method(self, test_method): - self.worker = PoolWorker(1000, self.tick, tuple()) + self.worker = StatefulPoolWorker(1000, self.tick, tuple()) def tick(self): self.worker.finished.put(self.worker.queue.get()['uuid'])