From cd0b9de7b92f952506afb27d60936664b5a7c525 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Mon, 21 Sep 2020 10:48:42 -0400 Subject: [PATCH] remove multiprocessing.Queue usage from the callback receiver instead, just have each worker connect directly to redis this has a few benefits: - it's simpler to explain and debug - back pressure on the queue keeps messages around in redis (which is observable, and survives the restart of Python processes) - it's likely notably more performant at high loads --- awx/main/dispatch/control.py | 10 ++++- awx/main/dispatch/worker/base.py | 17 +------- awx/main/dispatch/worker/callback.py | 58 +++++++++++++++++++++++----- awx/settings/defaults.py | 8 ++++ 4 files changed, 67 insertions(+), 26 deletions(-) diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index c7a623c8d9..47cc60b40d 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -25,8 +25,14 @@ class Control(object): def status(self, *args, **kwargs): r = redis.Redis.from_url(settings.BROKER_URL) - stats = r.get(f'awx_{self.service}_statistics') or b'' - return stats.decode('utf-8') + if self.service == 'dispatcher': + stats = r.get(f'awx_{self.service}_statistics') or b'' + return stats.decode('utf-8') + else: + workers = [] + for key in r.keys('awx_callback_receiver_statistics_*'): + workers.append(r.get(key).decode('utf-8')) + return '\n'.join(workers) def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index c5596112b8..8b44c71e43 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -132,22 +132,9 @@ class AWXConsumerRedis(AWXConsumerBase): super(AWXConsumerRedis, self).run(*args, **kwargs) self.worker.on_start() - time_to_sleep = 1 while True: - while True: - try: - res = self.redis.blpop(self.queues) - time_to_sleep = 1 - res = json.loads(res[1]) - self.process_task(res) - except redis.exceptions.RedisError: - time_to_sleep = min(time_to_sleep * 2, 30) - logger.exception(f"encountered an error communicating with redis. Reconnect attempt in {time_to_sleep} seconds") - time.sleep(time_to_sleep) - except (json.JSONDecodeError, KeyError): - logger.exception("failed to decode JSON message from redis") - if self.should_stop: - return + logger.debug(f'{os.getpid()} is alive') + time.sleep(60) class AWXConsumerPG(AWXConsumerBase): diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 1314f33f7c..64270ab174 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -1,4 +1,5 @@ import cProfile +import json import logging import os import pstats @@ -6,13 +7,16 @@ import signal import tempfile import time import traceback -from queue import Empty as QueueEmpty from django.conf import settings from django.utils.timezone import now as tz_now from django.db import DatabaseError, OperationalError, connection as django_connection from django.db.utils import InterfaceError, InternalError, IntegrityError +import psutil + +import redis + from awx.main.consumers import emit_channel_notification from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob, @@ -24,10 +28,6 @@ from .base import BaseWorker logger = logging.getLogger('awx.main.commands.run_callback_receiver') -# the number of seconds to buffer events in memory before flushing -# using JobEvent.objects.bulk_create() -BUFFER_SECONDS = .1 - class CallbackBrokerWorker(BaseWorker): ''' @@ -39,21 +39,57 @@ class CallbackBrokerWorker(BaseWorker): ''' MAX_RETRIES = 2 + last_stats = time.time() + total = 0 + last_event = '' prof = None def __init__(self): self.buff = {} + self.pid = os.getpid() + self.redis = redis.Redis.from_url(settings.BROKER_URL) + for key in self.redis.keys('awx_callback_receiver_statistics_*'): + self.redis.delete(key) def read(self, queue): try: - return queue.get(block=True, timeout=BUFFER_SECONDS) - except QueueEmpty: - return {'event': 'FLUSH'} + res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=settings.JOB_EVENT_BUFFER_SECONDS) + if res is None: + return {'event': 'FLUSH'} + self.total += 1 + return json.loads(res[1]) + except redis.exceptions.RedisError: + logger.exception("encountered an error communicating with redis") + time.sleep(1) + except (json.JSONDecodeError, KeyError): + logger.exception("failed to decode JSON message from redis") + finally: + self.record_statistics() + return {'event': 'FLUSH'} + + def record_statistics(self): + # buffer stat recording to once per (by default) 5s + if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL: + try: + self.redis.set(f'awx_callback_receiver_statistics_{self.pid}', self.debug()) + self.last_stats = time.time() + except Exception: + logger.exception("encountered an error communicating with redis") + self.last_stats = time.time() + + def debug(self): + return f'. worker[pid:{self.pid}] sent={self.total} rss={self.mb}MB {self.last_event}' + + @property + def mb(self): + return '{:0.3f}'.format( + psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0 + ) def toggle_profiling(self, *args): if self.prof: self.prof.disable() - filename = f'callback-{os.getpid()}.pstats' + filename = f'callback-{self.pid}.pstats' filepath = os.path.join(tempfile.gettempdir(), filename) with open(filepath, 'w') as f: pstats.Stats(self.prof, stream=f).sort_stats('cumulative').print_stats() @@ -108,6 +144,8 @@ class CallbackBrokerWorker(BaseWorker): def perform_work(self, body): try: flush = body.get('event') == 'FLUSH' + if flush: + self.last_event = '' if not flush: event_map = { 'job_id': JobEvent, @@ -123,6 +161,8 @@ class CallbackBrokerWorker(BaseWorker): job_identifier = body[key] break + self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa + if body.get('event') == 'EOF': try: final_counter = body.get('final_counter', 0) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 2d5c3085fe..618f5282a4 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -197,6 +197,14 @@ LOCAL_STDOUT_EXPIRE_TIME = 2592000 # events into the database JOB_EVENT_WORKERS = 4 +# The number of seconds (must be an integer) to buffer callback receiver bulk +# writes in memory before flushing via JobEvent.objects.bulk_create() +JOB_EVENT_BUFFER_SECONDS = 1 + +# The interval at which callback receiver statistics should be +# recorded +JOB_EVENT_STATISTICS_INTERVAL = 5 + # The maximum size of the job event worker queue before requests are blocked JOB_EVENT_MAX_QUEUE_SIZE = 10000