Move websocket skip logic into event_handler

This commit is contained in:
Alan Rominger
2021-04-29 13:14:50 -04:00
parent b43d8e2c7f
commit b551608f16
3 changed files with 30 additions and 27 deletions

View File

@@ -142,7 +142,8 @@ class CallbackBrokerWorker(BaseWorker):
logger.exception('Database Error Saving Job Event') logger.exception('Database Error Saving Job Event')
duration_to_save = time.perf_counter() - duration_to_save duration_to_save = time.perf_counter() - duration_to_save
for e in events: for e in events:
emit_event_detail(e) if not e.event_data.get('skip_websocket_message', False):
emit_event_detail(e)
self.buff = {} self.buff = {}
self.last_flush = time.time() self.last_flush = time.time()
# only update metrics if we saved events # only update metrics if we saved events

View File

@@ -2,8 +2,7 @@
import datetime import datetime
import logging import logging
from collections import defaultdict, deque from collections import defaultdict
import time
from django.conf import settings from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
@@ -60,33 +59,10 @@ def create_host_status_counts(event_data):
MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])
MAX_WEBSOCKET_EVENT_RATE = 30
# TODO: these should be job-specific, this is the easy part, that is the hard part
emit_times = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE)
def emit_event_detail(event): def emit_event_detail(event):
# websocket rate limiting logic
if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS: if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS:
return return
cpu_time = time.time()
if emit_times:
first_window_time = emit_times[0]
inverse_effective_rate = cpu_time - first_window_time
if inverse_effective_rate < 1.0:
if emit_times[0] != emit_times[-1]:
logger.info('Too many events chief, not broadcasting because that would be crazy')
# this is to smooth out jumpiness, we clear the events except for the last one
# that will enforce that we wait a full second before starting again
emit_times.clear()
emit_times.append(first_window_time)
return
elif emit_times[0] == emit_times[-1]:
logger.info('Starting a window of emit emission, will pause if I see too many')
emit_times.append(cpu_time)
cls = event.__class__ cls = event.__class__
relation = { relation = {
JobEvent: 'job_id', JobEvent: 'job_id',

View File

@@ -4,7 +4,7 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
from collections import OrderedDict, namedtuple from collections import OrderedDict, namedtuple, deque
import errno import errno
import functools import functools
import importlib import importlib
@@ -729,6 +729,10 @@ def with_path_cleanup(f):
return _wrapped return _wrapped
# TODO: move to CTiT settings if we ever get serious about this
MAX_WEBSOCKET_EVENT_RATE = 30
class BaseTask(object): class BaseTask(object):
model = None model = None
event_model = None event_model = None
@@ -740,6 +744,7 @@ class BaseTask(object):
self.host_map = {} self.host_map = {}
self.guid = GuidMiddleware.get_guid() self.guid = GuidMiddleware.get_guid()
self.job_created = None self.job_created = None
self.recent_event_timings = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE)
def update_model(self, pk, _attempt=0, **updates): def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the """Reload the model instance from the database and update the
@@ -1151,6 +1156,27 @@ class BaseTask(object):
if 'event_data' in event_data: if 'event_data' in event_data:
event_data['event_data']['guid'] = self.guid event_data['event_data']['guid'] = self.guid
# To prevent overwhelming the broadcast queue, skip some websocket messages
cpu_time = time.time()
if self.recent_event_timings:
first_window_time = self.recent_event_timings[0]
inverse_effective_rate = cpu_time - first_window_time
# if last 30 events came in under 1 second ago
if inverse_effective_rate < 1.0:
if self.recent_event_timings[0] != self.recent_event_timings[-1]:
logger.info('Too many events chief, not broadcasting because that would be crazy')
# this is to smooth out jumpiness, we clear the events except for the last one
# that will enforce that we wait a full second before starting again
self.recent_event_timings.clear()
self.recent_event_timings.append(first_window_time)
event_data['skip_websocket_message'] = True
else:
if self.recent_event_timings[0] == self.recent_event_timings[-1]:
logger.info('Starting a window of event emission, will pause if I see too many')
self.recent_event_timings.append(cpu_time)
else:
self.recent_event_timings.append(cpu_time)
event_data.setdefault(self.event_data_key, self.instance.id) event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data) self.dispatcher.dispatch(event_data)
self.event_ct += 1 self.event_ct += 1