diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index fd96a4f04e..e61a516094 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -38,6 +38,7 @@ class CallbackBrokerWorker(BaseWorker): MAX_RETRIES = 2 last_stats = time.time() + last_flush = time.time() total = 0 last_event = '' prof = None @@ -52,7 +53,7 @@ class CallbackBrokerWorker(BaseWorker): def read(self, queue): try: - res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=settings.JOB_EVENT_BUFFER_SECONDS) + res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=1) if res is None: return {'event': 'FLUSH'} self.total += 1 @@ -102,6 +103,7 @@ class CallbackBrokerWorker(BaseWorker): now = tz_now() if ( force or + (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]) ): for cls, events in self.buff.items(): @@ -124,6 +126,7 @@ class CallbackBrokerWorker(BaseWorker): for e in events: emit_event_detail(e) self.buff = {} + self.last_flush = time.time() def perform_work(self, body): try: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 05c8a42f20..71881918a3 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -196,9 +196,9 @@ LOCAL_STDOUT_EXPIRE_TIME = 2592000 # events into the database JOB_EVENT_WORKERS = 4 -# The number of seconds (must be an integer) to buffer callback receiver bulk +# The number of seconds to buffer callback receiver bulk # writes in memory before flushing via JobEvent.objects.bulk_create() -JOB_EVENT_BUFFER_SECONDS = 1 +JOB_EVENT_BUFFER_SECONDS = .1 # The interval at which callback receiver statistics should be # recorded