Merge pull request #8191 from ryanpetrello/callback-directly-to-redis

remove multiprocessing.Queue usage from the callback receiver

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-09-24 18:21:49 +00:00
committed by GitHub
4 changed files with 67 additions and 26 deletions

View File

@@ -25,8 +25,14 @@ class Control(object):
def status(self, *args, **kwargs): def status(self, *args, **kwargs):
r = redis.Redis.from_url(settings.BROKER_URL) r = redis.Redis.from_url(settings.BROKER_URL)
stats = r.get(f'awx_{self.service}_statistics') or b'' if self.service == 'dispatcher':
return stats.decode('utf-8') stats = r.get(f'awx_{self.service}_statistics') or b''
return stats.decode('utf-8')
else:
workers = []
for key in r.keys('awx_callback_receiver_statistics_*'):
workers.append(r.get(key).decode('utf-8'))
return '\n'.join(workers)
def running(self, *args, **kwargs): def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs) return self.control_with_reply('running', *args, **kwargs)

View File

@@ -132,22 +132,9 @@ class AWXConsumerRedis(AWXConsumerBase):
super(AWXConsumerRedis, self).run(*args, **kwargs) super(AWXConsumerRedis, self).run(*args, **kwargs)
self.worker.on_start() self.worker.on_start()
time_to_sleep = 1
while True: while True:
while True: logger.debug(f'{os.getpid()} is alive')
try: time.sleep(60)
res = self.redis.blpop(self.queues)
time_to_sleep = 1
res = json.loads(res[1])
self.process_task(res)
except redis.exceptions.RedisError:
time_to_sleep = min(time_to_sleep * 2, 30)
logger.exception(f"encountered an error communicating with redis. Reconnect attempt in {time_to_sleep} seconds")
time.sleep(time_to_sleep)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
if self.should_stop:
return
class AWXConsumerPG(AWXConsumerBase): class AWXConsumerPG(AWXConsumerBase):

View File

@@ -1,4 +1,5 @@
import cProfile import cProfile
import json
import logging import logging
import os import os
import pstats import pstats
@@ -6,13 +7,16 @@ import signal
import tempfile import tempfile
import time import time
import traceback import traceback
from queue import Empty as QueueEmpty
from django.conf import settings from django.conf import settings
from django.utils.timezone import now as tz_now from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError, IntegrityError from django.db.utils import InterfaceError, InternalError, IntegrityError
import psutil
import redis
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent,
InventoryUpdateEvent, SystemJobEvent, UnifiedJob, InventoryUpdateEvent, SystemJobEvent, UnifiedJob,
@@ -24,10 +28,6 @@ from .base import BaseWorker
logger = logging.getLogger('awx.main.commands.run_callback_receiver') logger = logging.getLogger('awx.main.commands.run_callback_receiver')
# the number of seconds to buffer events in memory before flushing
# using JobEvent.objects.bulk_create()
BUFFER_SECONDS = .1
class CallbackBrokerWorker(BaseWorker): class CallbackBrokerWorker(BaseWorker):
''' '''
@@ -39,21 +39,57 @@ class CallbackBrokerWorker(BaseWorker):
''' '''
MAX_RETRIES = 2 MAX_RETRIES = 2
last_stats = time.time()
total = 0
last_event = ''
prof = None prof = None
def __init__(self): def __init__(self):
self.buff = {} self.buff = {}
self.pid = os.getpid()
self.redis = redis.Redis.from_url(settings.BROKER_URL)
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
self.redis.delete(key)
def read(self, queue): def read(self, queue):
try: try:
return queue.get(block=True, timeout=BUFFER_SECONDS) res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=settings.JOB_EVENT_BUFFER_SECONDS)
except QueueEmpty: if res is None:
return {'event': 'FLUSH'} return {'event': 'FLUSH'}
self.total += 1
return json.loads(res[1])
except redis.exceptions.RedisError:
logger.exception("encountered an error communicating with redis")
time.sleep(1)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
finally:
self.record_statistics()
return {'event': 'FLUSH'}
def record_statistics(self):
# buffer stat recording to once per (by default) 5s
if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL:
try:
self.redis.set(f'awx_callback_receiver_statistics_{self.pid}', self.debug())
self.last_stats = time.time()
except Exception:
logger.exception("encountered an error communicating with redis")
self.last_stats = time.time()
def debug(self):
return f'. worker[pid:{self.pid}] sent={self.total} rss={self.mb}MB {self.last_event}'
@property
def mb(self):
return '{:0.3f}'.format(
psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0
)
def toggle_profiling(self, *args): def toggle_profiling(self, *args):
if self.prof: if self.prof:
self.prof.disable() self.prof.disable()
filename = f'callback-{os.getpid()}.pstats' filename = f'callback-{self.pid}.pstats'
filepath = os.path.join(tempfile.gettempdir(), filename) filepath = os.path.join(tempfile.gettempdir(), filename)
with open(filepath, 'w') as f: with open(filepath, 'w') as f:
pstats.Stats(self.prof, stream=f).sort_stats('cumulative').print_stats() pstats.Stats(self.prof, stream=f).sort_stats('cumulative').print_stats()
@@ -108,6 +144,8 @@ class CallbackBrokerWorker(BaseWorker):
def perform_work(self, body): def perform_work(self, body):
try: try:
flush = body.get('event') == 'FLUSH' flush = body.get('event') == 'FLUSH'
if flush:
self.last_event = ''
if not flush: if not flush:
event_map = { event_map = {
'job_id': JobEvent, 'job_id': JobEvent,
@@ -123,6 +161,8 @@ class CallbackBrokerWorker(BaseWorker):
job_identifier = body[key] job_identifier = body[key]
break break
self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa
if body.get('event') == 'EOF': if body.get('event') == 'EOF':
try: try:
final_counter = body.get('final_counter', 0) final_counter = body.get('final_counter', 0)

View File

@@ -197,6 +197,14 @@ LOCAL_STDOUT_EXPIRE_TIME = 2592000
# events into the database # events into the database
JOB_EVENT_WORKERS = 4 JOB_EVENT_WORKERS = 4
# The number of seconds (must be an integer) to buffer callback receiver bulk
# writes in memory before flushing via JobEvent.objects.bulk_create()
JOB_EVENT_BUFFER_SECONDS = 1
# The interval at which callback receiver statistics should be
# recorded
JOB_EVENT_STATISTICS_INTERVAL = 5
# The maximum size of the job event worker queue before requests are blocked # The maximum size of the job event worker queue before requests are blocked
JOB_EVENT_MAX_QUEUE_SIZE = 10000 JOB_EVENT_MAX_QUEUE_SIZE = 10000