make --status more robust for dispatcher, and add support for receiver

make the --status flag work by fetching a periodically recorded snapshot
of internal process state; additionally, update the callback receiver to
*also* record these statistics so we can gain more insight into any
performance issues
This commit is contained in:
Ryan Petrello
2020-09-17 14:25:00 -04:00
parent 0df6409244
commit 57f8e48894
5 changed files with 43 additions and 8 deletions

View File

@@ -2,6 +2,9 @@ import logging
import uuid import uuid
import json import json
from django.conf import settings
import redis
from awx.main.dispatch import get_local_queuename from awx.main.dispatch import get_local_queuename
from . import pg_bus_conn from . import pg_bus_conn
@@ -21,7 +24,9 @@ class Control(object):
self.queuename = host or get_local_queuename() self.queuename = host or get_local_queuename()
def status(self, *args, **kwargs): def status(self, *args, **kwargs):
return self.control_with_reply('status', *args, **kwargs) r = redis.Redis.from_url(settings.BROKER_URL)
stats = r.get(f'awx_{self.service}_statistics') or b''
return stats.decode('utf-8')
def running(self, *args, **kwargs): def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs) return self.control_with_reply('running', *args, **kwargs)

View File

@@ -5,6 +5,7 @@ import signal
import sys import sys
import time import time
import traceback import traceback
from datetime import datetime
from uuid import uuid4 from uuid import uuid4
import collections import collections
@@ -257,13 +258,13 @@ class WorkerPool(object):
return idx, worker return idx, worker
def debug(self, *args, **kwargs): def debug(self, *args, **kwargs):
self.cleanup()
tmpl = Template( tmpl = Template(
'Recorded at: {{ dt }} \n'
'{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n' '{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n'
'{% for w in workers %}' '{% for w in workers %}'
'. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}'
' sent={{ w.messages_sent }}' ' sent={{ w.messages_sent }}'
' finished={{ w.messages_finished }}' '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}'
' qsize={{ w.managed_tasks|length }}' ' qsize={{ w.managed_tasks|length }}'
' rss={{ w.mb }}MB' ' rss={{ w.mb }}MB'
'{% for task in w.managed_tasks.values() %}' '{% for task in w.managed_tasks.values() %}'
@@ -281,7 +282,11 @@ class WorkerPool(object):
'\n' '\n'
'{% endfor %}' '{% endfor %}'
) )
return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta) now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
return tmpl.render(
pool=self, workers=self.workers, meta=self.debug_meta,
dt=now
)
def write(self, preferred_queue, body): def write(self, preferred_queue, body):
queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x==preferred_queue else x) queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x==preferred_queue else x)
@@ -332,6 +337,10 @@ class AutoscalePool(WorkerPool):
# max workers can't be less than min_workers # max workers can't be less than min_workers
self.max_workers = max(self.min_workers, self.max_workers) self.max_workers = max(self.min_workers, self.max_workers)
def debug(self, *args, **kwargs):
self.cleanup()
return super(AutoscalePool, self).debug(*args, **kwargs)
@property @property
def should_grow(self): def should_grow(self):
if len(self.workers) < self.min_workers: if len(self.workers) < self.min_workers:

View File

@@ -43,6 +43,9 @@ class WorkerSignalHandler:
class AWXConsumerBase(object): class AWXConsumerBase(object):
last_stats = time.time()
def __init__(self, name, worker, queues=[], pool=None): def __init__(self, name, worker, queues=[], pool=None):
self.should_stop = False self.should_stop = False
@@ -54,6 +57,7 @@ class AWXConsumerBase(object):
if pool is None: if pool is None:
self.pool = WorkerPool() self.pool = WorkerPool()
self.pool.init_workers(self.worker.work_loop) self.pool.init_workers(self.worker.work_loop)
self.redis = redis.Redis.from_url(settings.BROKER_URL)
@property @property
def listening_on(self): def listening_on(self):
@@ -99,6 +103,16 @@ class AWXConsumerBase(object):
queue = 0 queue = 0
self.pool.write(queue, body) self.pool.write(queue, body)
self.total_messages += 1 self.total_messages += 1
self.record_statistics()
def record_statistics(self):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
try:
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
self.last_stats = time.time()
except Exception:
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
self.last_stats = time.time()
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGINT, self.stop)
@@ -120,10 +134,9 @@ class AWXConsumerRedis(AWXConsumerBase):
time_to_sleep = 1 time_to_sleep = 1
while True: while True:
queue = redis.Redis.from_url(settings.BROKER_URL)
while True: while True:
try: try:
res = queue.blpop(self.queues) res = self.redis.blpop(self.queues)
time_to_sleep = 1 time_to_sleep = 1
res = json.loads(res[1]) res = json.loads(res[1])
self.process_task(res) self.process_task(res)

View File

@@ -4,6 +4,7 @@
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from awx.main.dispatch.control import Control
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
@@ -15,7 +16,14 @@ class Command(BaseCommand):
''' '''
help = 'Launch the job callback receiver' help = 'Launch the job callback receiver'
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true',
help='print the internal state of any running dispatchers')
def handle(self, *arg, **options): def handle(self, *arg, **options):
if options.get('status'):
print(Control('callback_receiver').status())
return
consumer = None consumer = None
try: try:
consumer = AWXConsumerRedis( consumer = AWXConsumerRedis(

View File

@@ -10,7 +10,7 @@ import pytest
from awx.main.models import Job, WorkflowJob, Instance from awx.main.models import Job, WorkflowJob, Instance
from awx.main.dispatch import reaper from awx.main.dispatch import reaper
from awx.main.dispatch.pool import PoolWorker, WorkerPool, AutoscalePool from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool
from awx.main.dispatch.publish import task from awx.main.dispatch.publish import task
from awx.main.dispatch.worker import BaseWorker, TaskWorker from awx.main.dispatch.worker import BaseWorker, TaskWorker
@@ -80,7 +80,7 @@ class SlowResultWriter(BaseWorker):
class TestPoolWorker: class TestPoolWorker:
def setup_method(self, test_method): def setup_method(self, test_method):
self.worker = PoolWorker(1000, self.tick, tuple()) self.worker = StatefulPoolWorker(1000, self.tick, tuple())
def tick(self): def tick(self):
self.worker.finished.put(self.worker.queue.get()['uuid']) self.worker.finished.put(self.worker.queue.get()['uuid'])