Merge pull request #9086 from ryanpetrello/more-callback-tinkering

further optimize callback receiver buffering for certain situations

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2021-01-18 19:13:23 +00:00 committed by GitHub
commit 3a467067f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 3 deletions

View File

@ -38,6 +38,7 @@ class CallbackBrokerWorker(BaseWorker):
MAX_RETRIES = 2
last_stats = time.time()
last_flush = time.time()
total = 0
last_event = ''
prof = None
@ -52,7 +53,7 @@ class CallbackBrokerWorker(BaseWorker):
def read(self, queue):
try:
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=settings.JOB_EVENT_BUFFER_SECONDS)
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=1)
if res is None:
return {'event': 'FLUSH'}
self.total += 1
@ -102,6 +103,7 @@ class CallbackBrokerWorker(BaseWorker):
now = tz_now()
if (
force or
(time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or
any([len(events) >= 1000 for events in self.buff.values()])
):
for cls, events in self.buff.items():
@ -124,6 +126,7 @@ class CallbackBrokerWorker(BaseWorker):
for e in events:
emit_event_detail(e)
self.buff = {}
self.last_flush = time.time()
def perform_work(self, body):
try:

View File

@ -196,9 +196,9 @@ LOCAL_STDOUT_EXPIRE_TIME = 2592000
# events into the database
JOB_EVENT_WORKERS = 4
# The number of seconds (must be an integer) to buffer callback receiver bulk
# The number of seconds to buffer callback receiver bulk
# writes in memory before flushing via JobEvent.objects.bulk_create()
JOB_EVENT_BUFFER_SECONDS = 1
JOB_EVENT_BUFFER_SECONDS = .1
# The interval at which callback receiver statistics should be
# recorded