diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index acfb0bce02..65d5f16844 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -142,7 +142,8 @@ class CallbackBrokerWorker(BaseWorker): logger.exception('Database Error Saving Job Event') duration_to_save = time.perf_counter() - duration_to_save for e in events: - emit_event_detail(e) + if not e.event_data.get('skip_websocket_message', False): + emit_event_detail(e) self.buff = {} self.last_flush = time.time() # only update metrics if we saved events diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 8ddb56774c..222eb22438 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -2,8 +2,7 @@ import datetime import logging -from collections import defaultdict, deque -import time +from collections import defaultdict from django.conf import settings from django.core.exceptions import ObjectDoesNotExist @@ -60,33 +59,10 @@ def create_host_status_counts(event_data): MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) -MAX_WEBSOCKET_EVENT_RATE = 30 - -# TODO: these should be job-specific, this is the easy part, that is the hard part -emit_times = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE) - def emit_event_detail(event): - - # websocket rate limiting logic if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS: return - cpu_time = time.time() - if emit_times: - first_window_time = emit_times[0] - inverse_effective_rate = cpu_time - first_window_time - if inverse_effective_rate < 1.0: - if emit_times[0] != emit_times[-1]: - logger.info('Too many events chief, not broadcasting because that would be crazy') - # this is to smooth out jumpiness, we clear the events except for the last one - # that will enforce that we wait a full second before starting again - emit_times.clear() - emit_times.append(first_window_time) - return - elif emit_times[0] == emit_times[-1]: - logger.info('Starting a window of emit emission, will pause if I see too many') - emit_times.append(cpu_time) - cls = event.__class__ relation = { JobEvent: 'job_id', diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 317c3d6111..a5a42e3a67 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -4,7 +4,7 @@ # All Rights Reserved. # Python -from collections import OrderedDict, namedtuple +from collections import OrderedDict, namedtuple, deque import errno import functools import importlib @@ -729,6 +729,10 @@ def with_path_cleanup(f): return _wrapped +# TODO: move to CTiT settings if we ever get serious about this +MAX_WEBSOCKET_EVENT_RATE = 30 + + class BaseTask(object): model = None event_model = None @@ -740,6 +744,7 @@ class BaseTask(object): self.host_map = {} self.guid = GuidMiddleware.get_guid() self.job_created = None + self.recent_event_timings = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE) def update_model(self, pk, _attempt=0, **updates): """Reload the model instance from the database and update the @@ -1151,6 +1156,27 @@ class BaseTask(object): if 'event_data' in event_data: event_data['event_data']['guid'] = self.guid + # To prevent overwhelming the broadcast queue, skip some websocket messages + cpu_time = time.time() + if self.recent_event_timings: + first_window_time = self.recent_event_timings[0] + inverse_effective_rate = cpu_time - first_window_time + # if last 30 events came in under 1 second ago + if inverse_effective_rate < 1.0: + if self.recent_event_timings[0] != self.recent_event_timings[-1]: + logger.info('Too many events chief, not broadcasting because that would be crazy') + # this is to smooth out jumpiness, we clear the events except for the last one + # that will enforce that we wait a full second before starting again + self.recent_event_timings.clear() + self.recent_event_timings.append(first_window_time) + event_data['skip_websocket_message'] = True + else: + if self.recent_event_timings[0] == self.recent_event_timings[-1]: + logger.info('Starting a window of event emission, will pause if I see too many') + self.recent_event_timings.append(cpu_time) + else: + self.recent_event_timings.append(cpu_time) + event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) self.event_ct += 1