Merge pull request #8167 from ryanpetrello/callback-cleanup

Add support for a `--status` to the callback receiver (and improve our approach to stats collection in general)

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-09-18 13:58:30 +00:00
committed by GitHub
5 changed files with 70 additions and 12 deletions

View File

@@ -2,6 +2,9 @@ import logging
import uuid import uuid
import json import json
from django.conf import settings
import redis
from awx.main.dispatch import get_local_queuename from awx.main.dispatch import get_local_queuename
from . import pg_bus_conn from . import pg_bus_conn
@@ -21,7 +24,9 @@ class Control(object):
self.queuename = host or get_local_queuename() self.queuename = host or get_local_queuename()
def status(self, *args, **kwargs): def status(self, *args, **kwargs):
return self.control_with_reply('status', *args, **kwargs) r = redis.Redis.from_url(settings.BROKER_URL)
stats = r.get(f'awx_{self.service}_statistics') or b''
return stats.decode('utf-8')
def running(self, *args, **kwargs): def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs) return self.control_with_reply('running', *args, **kwargs)

View File

@@ -5,6 +5,7 @@ import signal
import sys import sys
import time import time
import traceback import traceback
from datetime import datetime
from uuid import uuid4 from uuid import uuid4
import collections import collections
@@ -27,6 +28,12 @@ else:
logger = logging.getLogger('awx.main.dispatch') logger = logging.getLogger('awx.main.dispatch')
class NoOpResultQueue(object):
def put(self, item):
pass
class PoolWorker(object): class PoolWorker(object):
''' '''
Used to track a worker child process and its pending and finished messages. Used to track a worker child process and its pending and finished messages.
@@ -56,11 +63,13 @@ class PoolWorker(object):
It is "idle" when self.managed_tasks is empty. It is "idle" when self.managed_tasks is empty.
''' '''
def __init__(self, queue_size, target, args): track_managed_tasks = False
def __init__(self, queue_size, target, args, **kwargs):
self.messages_sent = 0 self.messages_sent = 0
self.messages_finished = 0 self.messages_finished = 0
self.managed_tasks = collections.OrderedDict() self.managed_tasks = collections.OrderedDict()
self.finished = MPQueue(queue_size) self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue()
self.queue = MPQueue(queue_size) self.queue = MPQueue(queue_size)
self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process = Process(target=target, args=(self.queue, self.finished) + args)
self.process.daemon = True self.process.daemon = True
@@ -74,7 +83,8 @@ class PoolWorker(object):
if not body.get('uuid'): if not body.get('uuid'):
body['uuid'] = str(uuid4()) body['uuid'] = str(uuid4())
uuid = body['uuid'] uuid = body['uuid']
self.managed_tasks[uuid] = body if self.track_managed_tasks:
self.managed_tasks[uuid] = body
self.queue.put(body, block=True, timeout=5) self.queue.put(body, block=True, timeout=5)
self.messages_sent += 1 self.messages_sent += 1
self.calculate_managed_tasks() self.calculate_managed_tasks()
@@ -111,6 +121,8 @@ class PoolWorker(object):
return str(self.process.exitcode) return str(self.process.exitcode)
def calculate_managed_tasks(self): def calculate_managed_tasks(self):
if not self.track_managed_tasks:
return
# look to see if any tasks were finished # look to see if any tasks were finished
finished = [] finished = []
for _ in range(self.finished.qsize()): for _ in range(self.finished.qsize()):
@@ -135,6 +147,8 @@ class PoolWorker(object):
@property @property
def current_task(self): def current_task(self):
if not self.track_managed_tasks:
return None
self.calculate_managed_tasks() self.calculate_managed_tasks()
# the task at [0] is the one that's running right now (or is about to # the task at [0] is the one that's running right now (or is about to
# be running) # be running)
@@ -145,6 +159,8 @@ class PoolWorker(object):
@property @property
def orphaned_tasks(self): def orphaned_tasks(self):
if not self.track_managed_tasks:
return []
orphaned = [] orphaned = []
if not self.alive: if not self.alive:
# if this process had a running task that never finished, # if this process had a running task that never finished,
@@ -179,6 +195,11 @@ class PoolWorker(object):
return not self.busy return not self.busy
class StatefulPoolWorker(PoolWorker):
track_managed_tasks = True
class WorkerPool(object): class WorkerPool(object):
''' '''
Creates a pool of forked PoolWorkers. Creates a pool of forked PoolWorkers.
@@ -200,6 +221,7 @@ class WorkerPool(object):
) )
''' '''
pool_cls = PoolWorker
debug_meta = '' debug_meta = ''
def __init__(self, min_workers=None, queue_size=None): def __init__(self, min_workers=None, queue_size=None):
@@ -225,7 +247,7 @@ class WorkerPool(object):
# for the DB and cache connections (that way lies race conditions) # for the DB and cache connections (that way lies race conditions)
django_connection.close() django_connection.close()
django_cache.close() django_cache.close()
worker = PoolWorker(self.queue_size, self.target, (idx,) + self.target_args) worker = self.pool_cls(self.queue_size, self.target, (idx,) + self.target_args)
self.workers.append(worker) self.workers.append(worker)
try: try:
worker.start() worker.start()
@@ -236,13 +258,13 @@ class WorkerPool(object):
return idx, worker return idx, worker
def debug(self, *args, **kwargs): def debug(self, *args, **kwargs):
self.cleanup()
tmpl = Template( tmpl = Template(
'Recorded at: {{ dt }} \n'
'{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n' '{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n'
'{% for w in workers %}' '{% for w in workers %}'
'. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}'
' sent={{ w.messages_sent }}' ' sent={{ w.messages_sent }}'
' finished={{ w.messages_finished }}' '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}'
' qsize={{ w.managed_tasks|length }}' ' qsize={{ w.managed_tasks|length }}'
' rss={{ w.mb }}MB' ' rss={{ w.mb }}MB'
'{% for task in w.managed_tasks.values() %}' '{% for task in w.managed_tasks.values() %}'
@@ -260,7 +282,11 @@ class WorkerPool(object):
'\n' '\n'
'{% endfor %}' '{% endfor %}'
) )
return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta) now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
return tmpl.render(
pool=self, workers=self.workers, meta=self.debug_meta,
dt=now
)
def write(self, preferred_queue, body): def write(self, preferred_queue, body):
queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x==preferred_queue else x) queue_order = sorted(range(len(self.workers)), key=lambda x: -1 if x==preferred_queue else x)
@@ -293,6 +319,8 @@ class AutoscalePool(WorkerPool):
down based on demand down based on demand
''' '''
pool_cls = StatefulPoolWorker
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.max_workers = kwargs.pop('max_workers', None) self.max_workers = kwargs.pop('max_workers', None)
super(AutoscalePool, self).__init__(*args, **kwargs) super(AutoscalePool, self).__init__(*args, **kwargs)
@@ -309,6 +337,10 @@ class AutoscalePool(WorkerPool):
# max workers can't be less than min_workers # max workers can't be less than min_workers
self.max_workers = max(self.min_workers, self.max_workers) self.max_workers = max(self.min_workers, self.max_workers)
def debug(self, *args, **kwargs):
self.cleanup()
return super(AutoscalePool, self).debug(*args, **kwargs)
@property @property
def should_grow(self): def should_grow(self):
if len(self.workers) < self.min_workers: if len(self.workers) < self.min_workers:

View File

@@ -43,6 +43,9 @@ class WorkerSignalHandler:
class AWXConsumerBase(object): class AWXConsumerBase(object):
last_stats = time.time()
def __init__(self, name, worker, queues=[], pool=None): def __init__(self, name, worker, queues=[], pool=None):
self.should_stop = False self.should_stop = False
@@ -54,6 +57,7 @@ class AWXConsumerBase(object):
if pool is None: if pool is None:
self.pool = WorkerPool() self.pool = WorkerPool()
self.pool.init_workers(self.worker.work_loop) self.pool.init_workers(self.worker.work_loop)
self.redis = redis.Redis.from_url(settings.BROKER_URL)
@property @property
def listening_on(self): def listening_on(self):
@@ -99,6 +103,16 @@ class AWXConsumerBase(object):
queue = 0 queue = 0
self.pool.write(queue, body) self.pool.write(queue, body)
self.total_messages += 1 self.total_messages += 1
self.record_statistics()
def record_statistics(self):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
try:
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
self.last_stats = time.time()
except Exception:
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
self.last_stats = time.time()
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGINT, self.stop)
@@ -120,10 +134,9 @@ class AWXConsumerRedis(AWXConsumerBase):
time_to_sleep = 1 time_to_sleep = 1
while True: while True:
queue = redis.Redis.from_url(settings.BROKER_URL)
while True: while True:
try: try:
res = queue.blpop(self.queues) res = self.redis.blpop(self.queues)
time_to_sleep = 1 time_to_sleep = 1
res = json.loads(res[1]) res = json.loads(res[1])
self.process_task(res) self.process_task(res)

View File

@@ -4,6 +4,7 @@
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from awx.main.dispatch.control import Control
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
@@ -15,7 +16,14 @@ class Command(BaseCommand):
''' '''
help = 'Launch the job callback receiver' help = 'Launch the job callback receiver'
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true',
help='print the internal state of any running dispatchers')
def handle(self, *arg, **options): def handle(self, *arg, **options):
if options.get('status'):
print(Control('callback_receiver').status())
return
consumer = None consumer = None
try: try:
consumer = AWXConsumerRedis( consumer = AWXConsumerRedis(

View File

@@ -10,7 +10,7 @@ import pytest
from awx.main.models import Job, WorkflowJob, Instance from awx.main.models import Job, WorkflowJob, Instance
from awx.main.dispatch import reaper from awx.main.dispatch import reaper
from awx.main.dispatch.pool import PoolWorker, WorkerPool, AutoscalePool from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool
from awx.main.dispatch.publish import task from awx.main.dispatch.publish import task
from awx.main.dispatch.worker import BaseWorker, TaskWorker from awx.main.dispatch.worker import BaseWorker, TaskWorker
@@ -80,7 +80,7 @@ class SlowResultWriter(BaseWorker):
class TestPoolWorker: class TestPoolWorker:
def setup_method(self, test_method): def setup_method(self, test_method):
self.worker = PoolWorker(1000, self.tick, tuple()) self.worker = StatefulPoolWorker(1000, self.tick, tuple())
def tick(self): def tick(self):
self.worker.finished.put(self.worker.queue.get()['uuid']) self.worker.finished.put(self.worker.queue.get()['uuid'])