further optimize callback receiver buffering for certain situations

see: https://github.com/ansible/awx/issues/9085
This commit is contained in:
Ryan Petrello 2021-01-14 14:43:54 -05:00
parent d88ed19edf
commit b744c4ebb7
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
2 changed files with 6 additions and 3 deletions

View File

@ -38,6 +38,7 @@ class CallbackBrokerWorker(BaseWorker):
MAX_RETRIES = 2
last_stats = time.time()
last_flush = time.time()
total = 0
last_event = ''
prof = None
@ -52,7 +53,7 @@ class CallbackBrokerWorker(BaseWorker):
def read(self, queue):
try:
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=settings.JOB_EVENT_BUFFER_SECONDS)
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=1)
if res is None:
return {'event': 'FLUSH'}
self.total += 1
@ -102,6 +103,7 @@ class CallbackBrokerWorker(BaseWorker):
now = tz_now()
if (
force or
(time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or
any([len(events) >= 1000 for events in self.buff.values()])
):
for cls, events in self.buff.items():
@ -124,6 +126,7 @@ class CallbackBrokerWorker(BaseWorker):
for e in events:
emit_event_detail(e)
self.buff = {}
self.last_flush = time.time()
def perform_work(self, body):
try:

View File

@ -196,9 +196,9 @@ LOCAL_STDOUT_EXPIRE_TIME = 2592000
# events into the database
JOB_EVENT_WORKERS = 4
# The number of seconds (must be an integer) to buffer callback receiver bulk
# The number of seconds to buffer callback receiver bulk
# writes in memory before flushing via JobEvent.objects.bulk_create()
JOB_EVENT_BUFFER_SECONDS = 1
JOB_EVENT_BUFFER_SECONDS = .1
# The interval at which callback receiver statistics should be
# recorded