From b43d8e2c7faaf3a46d2b31a2120c887320cc023d Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 29 Apr 2021 11:23:41 -0400 Subject: [PATCH 01/20] Enforce a 30-per-second max event websocket rate --- awx/main/models/events.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 222eb22438..8ddb56774c 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -2,7 +2,8 @@ import datetime import logging -from collections import defaultdict +from collections import defaultdict, deque +import time from django.conf import settings from django.core.exceptions import ObjectDoesNotExist @@ -59,10 +60,33 @@ def create_host_status_counts(event_data): MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) +MAX_WEBSOCKET_EVENT_RATE = 30 + +# TODO: these should be job-specific, this is the easy part, that is the hard part +emit_times = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE) + def emit_event_detail(event): + + # websocket rate limiting logic if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS: return + cpu_time = time.time() + if emit_times: + first_window_time = emit_times[0] + inverse_effective_rate = cpu_time - first_window_time + if inverse_effective_rate < 1.0: + if emit_times[0] != emit_times[-1]: + logger.info('Too many events chief, not broadcasting because that would be crazy') + # this is to smooth out jumpiness, we clear the events except for the last one + # that will enforce that we wait a full second before starting again + emit_times.clear() + emit_times.append(first_window_time) + return + elif emit_times[0] == emit_times[-1]: + logger.info('Starting a window of emit emission, will pause if I see too many') + emit_times.append(cpu_time) + cls = event.__class__ relation = { JobEvent: 'job_id', From b551608f16caab4bee715bb85f91f05e54af0c6a Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 29 Apr 2021 13:14:50 -0400 Subject: [PATCH 02/20] Move websocket skip logic into event_handler --- awx/main/dispatch/worker/callback.py | 3 ++- awx/main/models/events.py | 26 +------------------------- awx/main/tasks.py | 28 +++++++++++++++++++++++++++- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index acfb0bce02..65d5f16844 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -142,7 +142,8 @@ class CallbackBrokerWorker(BaseWorker): logger.exception('Database Error Saving Job Event') duration_to_save = time.perf_counter() - duration_to_save for e in events: - emit_event_detail(e) + if not e.event_data.get('skip_websocket_message', False): + emit_event_detail(e) self.buff = {} self.last_flush = time.time() # only update metrics if we saved events diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 8ddb56774c..222eb22438 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -2,8 +2,7 @@ import datetime import logging -from collections import defaultdict, deque -import time +from collections import defaultdict from django.conf import settings from django.core.exceptions import ObjectDoesNotExist @@ -60,33 +59,10 @@ def create_host_status_counts(event_data): MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) -MAX_WEBSOCKET_EVENT_RATE = 30 - -# TODO: these should be job-specific, this is the easy part, that is the hard part -emit_times = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE) - def emit_event_detail(event): - - # websocket rate limiting logic if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS: return - cpu_time = time.time() - if emit_times: - first_window_time = emit_times[0] - inverse_effective_rate = cpu_time - first_window_time - if inverse_effective_rate < 1.0: - if emit_times[0] != emit_times[-1]: - logger.info('Too many events chief, not broadcasting because that would be crazy') - # this is to smooth out jumpiness, we clear the events except for the last one - # that will enforce that we wait a full second before starting again - emit_times.clear() - emit_times.append(first_window_time) - return - elif emit_times[0] == emit_times[-1]: - logger.info('Starting a window of emit emission, will pause if I see too many') - emit_times.append(cpu_time) - cls = event.__class__ relation = { JobEvent: 'job_id', diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 317c3d6111..a5a42e3a67 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -4,7 +4,7 @@ # All Rights Reserved. # Python -from collections import OrderedDict, namedtuple +from collections import OrderedDict, namedtuple, deque import errno import functools import importlib @@ -729,6 +729,10 @@ def with_path_cleanup(f): return _wrapped +# TODO: move to CTiT settings if we ever get serious about this +MAX_WEBSOCKET_EVENT_RATE = 30 + + class BaseTask(object): model = None event_model = None @@ -740,6 +744,7 @@ class BaseTask(object): self.host_map = {} self.guid = GuidMiddleware.get_guid() self.job_created = None + self.recent_event_timings = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE) def update_model(self, pk, _attempt=0, **updates): """Reload the model instance from the database and update the @@ -1151,6 +1156,27 @@ class BaseTask(object): if 'event_data' in event_data: event_data['event_data']['guid'] = self.guid + # To prevent overwhelming the broadcast queue, skip some websocket messages + cpu_time = time.time() + if self.recent_event_timings: + first_window_time = self.recent_event_timings[0] + inverse_effective_rate = cpu_time - first_window_time + # if last 30 events came in under 1 second ago + if inverse_effective_rate < 1.0: + if self.recent_event_timings[0] != self.recent_event_timings[-1]: + logger.info('Too many events chief, not broadcasting because that would be crazy') + # this is to smooth out jumpiness, we clear the events except for the last one + # that will enforce that we wait a full second before starting again + self.recent_event_timings.clear() + self.recent_event_timings.append(first_window_time) + event_data['skip_websocket_message'] = True + else: + if self.recent_event_timings[0] == self.recent_event_timings[-1]: + logger.info('Starting a window of event emission, will pause if I see too many') + self.recent_event_timings.append(cpu_time) + else: + self.recent_event_timings.append(cpu_time) + event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) self.event_ct += 1 From cbb461ab71e92ebab72b0cafb2f86bd0ec7e3cc9 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 29 Apr 2021 13:26:22 -0400 Subject: [PATCH 03/20] Fix bug --- awx/main/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a5a42e3a67..5b0dabac6c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1169,7 +1169,8 @@ class BaseTask(object): # that will enforce that we wait a full second before starting again self.recent_event_timings.clear() self.recent_event_timings.append(first_window_time) - event_data['skip_websocket_message'] = True + event_data.setdefault('event_data', {}) + event_data['event_data']['skip_websocket_message'] = True else: if self.recent_event_timings[0] == self.recent_event_timings[-1]: logger.info('Starting a window of event emission, will pause if I see too many') From 01228cea028a06dba40412ccdc0308cb4f812235 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 29 Apr 2021 13:59:16 -0400 Subject: [PATCH 04/20] Implement max event websocket rate as setting --- awx/main/conf.py | 11 +++++++++++ awx/main/tasks.py | 12 ++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/awx/main/conf.py b/awx/main/conf.py index 09360fc54f..a69a0642f8 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -344,6 +344,17 @@ register( category_slug='jobs', ) +register( + 'MAX_WEBSOCKET_EVENT_RATE', + field_class=fields.IntegerField, + min_value=0, + default=30, + label=_('Job Event Maximum Websocket Messages Per Second'), + help_text=_('Maximum number of messages to update the UI live job output with per second. Value of 0 means no limit.'), + category=_('Jobs'), + category_slug='jobs', +) + register( 'SCHEDULE_MAX_JOBS', field_class=fields.IntegerField, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5b0dabac6c..a13e0c1ab9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -729,10 +729,6 @@ def with_path_cleanup(f): return _wrapped -# TODO: move to CTiT settings if we ever get serious about this -MAX_WEBSOCKET_EVENT_RATE = 30 - - class BaseTask(object): model = None event_model = None @@ -744,7 +740,7 @@ class BaseTask(object): self.host_map = {} self.guid = GuidMiddleware.get_guid() self.job_created = None - self.recent_event_timings = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE) + self.recent_event_timings = deque(maxlen=settings.MAX_WEBSOCKET_EVENT_RATE) def update_model(self, pk, _attempt=0, **updates): """Reload the model instance from the database and update the @@ -1157,8 +1153,8 @@ class BaseTask(object): event_data['event_data']['guid'] = self.guid # To prevent overwhelming the broadcast queue, skip some websocket messages - cpu_time = time.time() if self.recent_event_timings: + cpu_time = time.time() first_window_time = self.recent_event_timings[0] inverse_effective_rate = cpu_time - first_window_time # if last 30 events came in under 1 second ago @@ -1175,8 +1171,8 @@ class BaseTask(object): if self.recent_event_timings[0] == self.recent_event_timings[-1]: logger.info('Starting a window of event emission, will pause if I see too many') self.recent_event_timings.append(cpu_time) - else: - self.recent_event_timings.append(cpu_time) + elif self.recent_event_timings.maxlen: + self.recent_event_timings.append(time.time()) event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) From faa0a6cf9a85c6aef89d4148e59a98fe01fae534 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 29 Apr 2021 14:18:36 -0400 Subject: [PATCH 05/20] fix up log wording --- awx/main/tasks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a13e0c1ab9..5dd9416569 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1160,7 +1160,9 @@ class BaseTask(object): # if last 30 events came in under 1 second ago if inverse_effective_rate < 1.0: if self.recent_event_timings[0] != self.recent_event_timings[-1]: - logger.info('Too many events chief, not broadcasting because that would be crazy') + logger.info( + 'Too many events, skipping websocket {} broadcast for {} seconds'.format(self.instance.log_format, 1.0 - inverse_effective_rate) + ) # this is to smooth out jumpiness, we clear the events except for the last one # that will enforce that we wait a full second before starting again self.recent_event_timings.clear() @@ -1169,7 +1171,7 @@ class BaseTask(object): event_data['event_data']['skip_websocket_message'] = True else: if self.recent_event_timings[0] == self.recent_event_timings[-1]: - logger.info('Starting a window of event emission, will pause if I see too many') + logger.debug('Starting a window of event emission') self.recent_event_timings.append(cpu_time) elif self.recent_event_timings.maxlen: self.recent_event_timings.append(time.time()) From 50ca2d47ceb05254b5c02e99f11092acb315a775 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 29 Apr 2021 14:24:34 -0400 Subject: [PATCH 06/20] Further log adjustments --- awx/main/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5dd9416569..af7f5bc83f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1160,8 +1160,8 @@ class BaseTask(object): # if last 30 events came in under 1 second ago if inverse_effective_rate < 1.0: if self.recent_event_timings[0] != self.recent_event_timings[-1]: - logger.info( - 'Too many events, skipping websocket {} broadcast for {} seconds'.format(self.instance.log_format, 1.0 - inverse_effective_rate) + logger.debug( + 'Too many events, skipping job {} websocket broadcast for {:.4f} seconds'.format(self.instance.id, 1.0 - inverse_effective_rate) ) # this is to smooth out jumpiness, we clear the events except for the last one # that will enforce that we wait a full second before starting again From 70420dc3e44528c369fed372958f3568c08c566f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 30 Apr 2021 11:59:15 -0400 Subject: [PATCH 07/20] THIS DOES NOT WORK pass events if they fit either timing criteria --- awx/main/tasks.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index af7f5bc83f..1cbfe240b0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1156,17 +1156,15 @@ class BaseTask(object): if self.recent_event_timings: cpu_time = time.time() first_window_time = self.recent_event_timings[0] + last_window_time = self.recent_event_timings[-1] inverse_effective_rate = cpu_time - first_window_time - # if last 30 events came in under 1 second ago - if inverse_effective_rate < 1.0: - if self.recent_event_timings[0] != self.recent_event_timings[-1]: - logger.debug( - 'Too many events, skipping job {} websocket broadcast for {:.4f} seconds'.format(self.instance.id, 1.0 - inverse_effective_rate) - ) - # this is to smooth out jumpiness, we clear the events except for the last one - # that will enforce that we wait a full second before starting again - self.recent_event_timings.clear() - self.recent_event_timings.append(first_window_time) + # if last 30 events (which we sent websockets for) came in under 1 second ago + should_skip = bool( + inverse_effective_rate < 1.0 + and self.recent_event_timings.maxlen * (cpu_time - last_window_time) < 1.0 + and (len(self.recent_event_timings) == self.recent_event_timings.maxlen) + ) + if should_skip: event_data.setdefault('event_data', {}) event_data['event_data']['skip_websocket_message'] = True else: From 4b6b8f2bddb0cfdb888aea4c3d1377c40fbe0298 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 3 May 2021 11:32:03 -0400 Subject: [PATCH 08/20] Finish up the immediate or average rate method --- awx/main/tasks.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1cbfe240b0..58e033bcc4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1157,20 +1157,32 @@ class BaseTask(object): cpu_time = time.time() first_window_time = self.recent_event_timings[0] last_window_time = self.recent_event_timings[-1] - inverse_effective_rate = cpu_time - first_window_time - # if last 30 events (which we sent websockets for) came in under 1 second ago - should_skip = bool( - inverse_effective_rate < 1.0 - and self.recent_event_timings.maxlen * (cpu_time - last_window_time) < 1.0 - and (len(self.recent_event_timings) == self.recent_event_timings.maxlen) + + should_emit = bool( + # if 30the most recent websocket message was sent over 1 second ago + cpu_time - first_window_time > 1.0 + # if the very last websocket message came in over 1/30 seconds ago + or self.recent_event_timings.maxlen * (cpu_time - last_window_time) > 1.0 + # if the queue is not yet full + or (len(self.recent_event_timings) != self.recent_event_timings.maxlen) ) - if should_skip: + + logger.debug( + 'Event {} websocket send {}, queue {}, avg rate {}, last rate {}'.format( + event_data['counter'], + should_emit, + len(self.recent_event_timings), + 30.0 / (cpu_time - first_window_time), + 1.0 / (cpu_time - last_window_time), + ) + ) + + if should_emit: + self.recent_event_timings.append(cpu_time) + else: event_data.setdefault('event_data', {}) event_data['event_data']['skip_websocket_message'] = True - else: - if self.recent_event_timings[0] == self.recent_event_timings[-1]: - logger.debug('Starting a window of event emission') - self.recent_event_timings.append(cpu_time) + elif self.recent_event_timings.maxlen: self.recent_event_timings.append(time.time()) From b306c6f25889a64999653d066c0ab205f3886711 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 4 May 2021 09:08:32 -0400 Subject: [PATCH 09/20] Put new setting in defaults so unit tests will run --- awx/settings/defaults.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1f4853b11d..547e83fe32 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -197,8 +197,9 @@ UI_LIVE_UPDATES_ENABLED = True # beyond this limit and the value will be removed MAX_EVENT_RES_DATA = 700000 -# Note: This setting may be overridden by database settings. +# Note: These settings may be overridden by database settings. EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 +MAX_WEBSOCKET_EVENT_RATE = 30 # The amount of time before a stdout file is expired and removed locally # Note that this can be recreated if the stdout is downloaded From 4052603238abd2109b16ebe428b3db49c307458f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 4 May 2021 10:11:29 -0400 Subject: [PATCH 10/20] make sure log format does not error --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 58e033bcc4..c9f0154561 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1169,7 +1169,7 @@ class BaseTask(object): logger.debug( 'Event {} websocket send {}, queue {}, avg rate {}, last rate {}'.format( - event_data['counter'], + event_data.get('counter', 0), should_emit, len(self.recent_event_timings), 30.0 / (cpu_time - first_window_time), From 768ac01f5803cab4baf5174d39803347520ae985 Mon Sep 17 00:00:00 2001 From: Jake McDermott Date: Wed, 5 May 2021 11:13:23 -0400 Subject: [PATCH 11/20] Add basic output tailing When follow mode is enabled, fix the scroll position to the highest row so that the output panel is always displaying the latest events. --- .../src/screens/Job/JobOutput/JobOutput.jsx | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx index 976952afc7..13b36a776f 100644 --- a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx +++ b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx @@ -183,6 +183,11 @@ const OutputWrapper = styled.div` Object.keys(cssMap).map(className => `.${className}{${cssMap[className]}}`)} `; +const ListWrapper = styled(List)` + ${({ isFollowModeEnabled }) => + isFollowModeEnabled ? `overflow: hidden !important;` : ''} +`; + const OutputFooter = styled.div` background-color: #ebebeb; border-right: 1px solid #d7d7d7; @@ -310,6 +315,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const [showCancelModal, setShowCancelModal] = useState(false); const [remoteRowCount, setRemoteRowCount] = useState(0); const [results, setResults] = useState({}); + const [isFollowModeEnabled, setIsFollowModeEnabled] = useState(false); useEffect(() => { loadJobEvents(); @@ -657,6 +663,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { history.push(qs ? `${pathname}?${qs}` : pathname); }; + const handleFollowToggle = () => setIsFollowModeEnabled(!isFollowModeEnabled); + + useEffect(() => { + if (isFollowModeEnabled) { + scrollToRow(remoteRowCount); + } + }, [remoteRowCount, isFollowModeEnabled]); + const renderSearchComponent = () => ( + {isJobRunning(job.status) ? ( + + ) : null} - + ) : ( - { registerChild(ref); listRef.current = ref; From faded278e3715a344891825f4427fa0eea9117fd Mon Sep 17 00:00:00 2001 From: Jake McDermott Date: Wed, 5 May 2021 13:33:58 -0400 Subject: [PATCH 12/20] Disable follow mode on scroll --- .../src/screens/Job/JobOutput/JobOutput.jsx | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx index 13b36a776f..eb0447c28f 100644 --- a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx +++ b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx @@ -183,11 +183,6 @@ const OutputWrapper = styled.div` Object.keys(cssMap).map(className => `.${className}{${cssMap[className]}}`)} `; -const ListWrapper = styled(List)` - ${({ isFollowModeEnabled }) => - isFollowModeEnabled ? `overflow: hidden !important;` : ''} -`; - const OutputFooter = styled.div` background-color: #ebebeb; border-right: 1px solid #d7d7d7; @@ -315,7 +310,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const [showCancelModal, setShowCancelModal] = useState(false); const [remoteRowCount, setRemoteRowCount] = useState(0); const [results, setResults] = useState({}); - const [isFollowModeEnabled, setIsFollowModeEnabled] = useState(false); + const [isFollowEnabled, setIsFollowModeEnabled] = useState(false); useEffect(() => { loadJobEvents(); @@ -610,7 +605,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const handleScrollLast = () => { - scrollToRow(remoteRowCount); + scrollToRow(remoteRowCount - 1); }; const handleResize = ({ width }) => { @@ -663,13 +658,25 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { history.push(qs ? `${pathname}?${qs}` : pathname); }; - const handleFollowToggle = () => setIsFollowModeEnabled(!isFollowModeEnabled); + const handleFollowToggle = () => { + if (isFollowEnabled) { + setIsFollowModeEnabled(false); + } else { + setIsFollowModeEnabled(true); + scrollToRow(remoteRowCount - 1); + } + }; + const handleScroll = () => { + if (listRef?.current?.Grid?._renderedRowStopIndex < remoteRowCount - 1) { + setIsFollowModeEnabled(false); + } + }; useEffect(() => { - if (isFollowModeEnabled) { + if (isFollowEnabled) { scrollToRow(remoteRowCount); } - }, [remoteRowCount, isFollowModeEnabled]); + }, [remoteRowCount, isFollowEnabled]); const renderSearchComponent = () => ( {isJobRunning(job.status) ? ( - ) : null} @@ -790,10 +800,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { onScrollNext={handleScrollNext} onScrollPrevious={handleScrollPrevious} /> - + ) : ( - { registerChild(ref); listRef.current = ref; @@ -824,6 +830,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { scrollToAlignment="start" width={width || 1} overscanRowCount={20} + onScroll={handleScroll} /> )} From b919befc9069e36117a7b23cb8de416906e41665 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 3 May 2021 11:34:33 -0400 Subject: [PATCH 13/20] Add option to record websocket received time --- awxkit/awxkit/ws.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/awxkit/awxkit/ws.py b/awxkit/awxkit/ws.py index 41380d406e..d56fccf719 100644 --- a/awxkit/awxkit/ws.py +++ b/awxkit/awxkit/ws.py @@ -3,6 +3,7 @@ import logging import atexit import json import ssl +import datetime from queue import Queue, Empty from urllib.parse import urlparse @@ -50,7 +51,7 @@ class WSClient(object): # Subscription group types - def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None): + def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None, add_received_time=False): # delay this import, because this is an optional dependency import websocket @@ -90,6 +91,7 @@ class WSClient(object): self._message_cache = [] self._should_subscribe_to_pending_job = False self._pending_unsubscribe = threading.Event() + self._add_received_time = add_received_time def connect(self): wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE})) @@ -195,6 +197,8 @@ class WSClient(object): def _on_message(self, message): message = json.loads(message) log.debug('received message: {}'.format(message)) + if self._add_received_time: + message['received_time'] = datetime.datetime.utcnow() if all([message.get('group_name') == 'jobs', message.get('status') == 'pending', message.get('unified_job_id'), self._should_subscribe_to_pending_job]): if bool(message.get('project_id')) == (self._should_subscribe_to_pending_job['events'] == 'project_update_events'): From 15effd7ade0c9ccf264537982e8f1c495b15b5dc Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 6 May 2021 13:24:28 -0400 Subject: [PATCH 14/20] Add some conditions for always-send and never-send event types Always send websocket messages for high priority events like playbook_on_stats Never send websocket messages for events with no output unless they are a high priority event type --- awx/main/constants.py | 1 + awx/main/models/events.py | 4 +--- awx/main/tasks.py | 28 ++++++++++++++++++---------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/awx/main/constants.py b/awx/main/constants.py index b2d62d5ec9..fbd783b968 100644 --- a/awx/main/constants.py +++ b/awx/main/constants.py @@ -41,6 +41,7 @@ STANDARD_INVENTORY_UPDATE_ENV = { } CAN_CANCEL = ('new', 'pending', 'waiting', 'running') ACTIVE_STATES = CAN_CANCEL +MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) CENSOR_VALUE = '************' ENV_BLOCKLIST = frozenset( ( diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 222eb22438..0a3cef78d1 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -17,6 +17,7 @@ from awx.api.versioning import reverse from awx.main import consumers from awx.main.managers import DeferJobCreatedManager from awx.main.fields import JSONField +from awx.main.constants import MINIMAL_EVENTS from awx.main.models.base import CreatedModifiedModel from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore @@ -57,9 +58,6 @@ def create_host_status_counts(event_data): return dict(host_status_counts) -MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) - - def emit_event_detail(event): if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS: return diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c9f0154561..d8697095a3 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -57,7 +57,7 @@ from receptorctl.socket_interface import ReceptorControl # AWX from awx import __version__ as awx_application_version -from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV +from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS from awx.main.access import access_registry from awx.main.redact import UriCleaner from awx.main.models import ( @@ -1158,17 +1158,25 @@ class BaseTask(object): first_window_time = self.recent_event_timings[0] last_window_time = self.recent_event_timings[-1] - should_emit = bool( - # if 30the most recent websocket message was sent over 1 second ago - cpu_time - first_window_time > 1.0 - # if the very last websocket message came in over 1/30 seconds ago - or self.recent_event_timings.maxlen * (cpu_time - last_window_time) > 1.0 - # if the queue is not yet full - or (len(self.recent_event_timings) != self.recent_event_timings.maxlen) - ) + if event_data.get('event') in MINIMAL_EVENTS: + should_emit = True # always send some types like playbook_on_stats + if event_data.get('stdout') == '' and event_data['start_line'] == event_data['end_line']: + should_emit = False # exclude events with no output + else: + should_emit = any( + [ + # if 30the most recent websocket message was sent over 1 second ago + cpu_time - first_window_time > 1.0, + # if the very last websocket message came in over 1/30 seconds ago + self.recent_event_timings.maxlen * (cpu_time - last_window_time) > 1.0, + # if the queue is not yet full + len(self.recent_event_timings) != self.recent_event_timings.maxlen, + ] + ) logger.debug( - 'Event {} websocket send {}, queue {}, avg rate {}, last rate {}'.format( + 'Job {} event {} websocket send {}, queued: {}, rate - avg: {:.3f}, last: {:.3f}'.format( + self.instance.id, event_data.get('counter', 0), should_emit, len(self.recent_event_timings), From 53e8a9e709e9a5d10a2dbb6a259d92fd75b70e45 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 6 May 2021 13:34:24 -0400 Subject: [PATCH 15/20] Fix bug --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d8697095a3..20559f7982 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1160,7 +1160,7 @@ class BaseTask(object): if event_data.get('event') in MINIMAL_EVENTS: should_emit = True # always send some types like playbook_on_stats - if event_data.get('stdout') == '' and event_data['start_line'] == event_data['end_line']: + elif event_data.get('stdout') == '' and event_data['start_line'] == event_data['end_line']: should_emit = False # exclude events with no output else: should_emit = any( From 210d5084f0e22388cae98e2335c45925984ce35b Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 13 May 2021 10:54:33 -0400 Subject: [PATCH 16/20] Move skip flag up from event_data and pop it off --- awx/main/dispatch/worker/callback.py | 8 +++++++- awx/main/tasks.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 65d5f16844..279db49bfb 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -142,7 +142,7 @@ class CallbackBrokerWorker(BaseWorker): logger.exception('Database Error Saving Job Event') duration_to_save = time.perf_counter() - duration_to_save for e in events: - if not e.event_data.get('skip_websocket_message', False): + if not getattr(e, '_skip_websocket_message', False): emit_event_detail(e) self.buff = {} self.last_flush = time.time() @@ -208,7 +208,13 @@ class CallbackBrokerWorker(BaseWorker): GuidMiddleware.set_guid('') return + skip_websocket_message = body.pop('skip_websocket_message', False) + event = cls.create_from_data(**body) + + if skip_websocket_message: + event._skip_websocket_message = True + self.buff.setdefault(cls, []).append(event) retries = 0 diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 20559f7982..1fd7fe19f1 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1189,7 +1189,7 @@ class BaseTask(object): self.recent_event_timings.append(cpu_time) else: event_data.setdefault('event_data', {}) - event_data['event_data']['skip_websocket_message'] = True + event_data['skip_websocket_message'] = True elif self.recent_event_timings.maxlen: self.recent_event_timings.append(time.time()) From 579d49033a72fb60757dfeff28825fa4a94a9899 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 13 May 2021 11:33:54 -0400 Subject: [PATCH 17/20] Remove debugging log message --- awx/main/tasks.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1fd7fe19f1..b1e41fbe52 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1174,17 +1174,6 @@ class BaseTask(object): ] ) - logger.debug( - 'Job {} event {} websocket send {}, queued: {}, rate - avg: {:.3f}, last: {:.3f}'.format( - self.instance.id, - event_data.get('counter', 0), - should_emit, - len(self.recent_event_timings), - 30.0 / (cpu_time - first_window_time), - 1.0 / (cpu_time - last_window_time), - ) - ) - if should_emit: self.recent_event_timings.append(cpu_time) else: From f0e7f2dbcd96b3cc214a33b0442fca490fa27b41 Mon Sep 17 00:00:00 2001 From: mabashian Date: Wed, 12 May 2021 13:41:45 -0400 Subject: [PATCH 18/20] Adds logic to try to keep visible page accurate in follow mode --- .../src/screens/Job/JobOutput/JobOutput.jsx | 99 ++++++++++++------- 1 file changed, 65 insertions(+), 34 deletions(-) diff --git a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx index eb0447c28f..b439f86be9 100644 --- a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx +++ b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx @@ -38,6 +38,7 @@ import { HostStatusBar, OutputToolbar } from './shared'; import getRowRangePageSize from './shared/jobOutputUtils'; import { getJobModel, isJobRunning } from '../../../util/jobs'; import useRequest, { useDismissableError } from '../../../util/useRequest'; +import useInterval from '../../../util/useInterval'; import { parseQueryString, mergeParams, @@ -297,12 +298,13 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const listRef = useRef(null); const previousWidth = useRef(0); const jobSocketCounter = useRef(0); - const interval = useRef(null); const isMounted = useIsMounted(); + const scrollTop = useRef(0); + const scrollHeight = useRef(0); + const currentlyLoading = useRef([]); const history = useHistory(); const [contentError, setContentError] = useState(null); const [cssMap, setCssMap] = useState({}); - const [currentlyLoading, setCurrentlyLoading] = useState([]); const [hasContentLoading, setHasContentLoading] = useState(true); const [hostEvent, setHostEvent] = useState({}); const [isHostModalOpen, setIsHostModalOpen] = useState(false); @@ -310,7 +312,17 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const [showCancelModal, setShowCancelModal] = useState(false); const [remoteRowCount, setRemoteRowCount] = useState(0); const [results, setResults] = useState({}); - const [isFollowEnabled, setIsFollowModeEnabled] = useState(false); + const [isFollowModeEnabled, setIsFollowModeEnabled] = useState( + isJobRunning(job.status) + ); + const [isMonitoringWebsocket, setIsMonitoringWebsocket] = useState(false); + + useInterval( + () => { + monitorJobSocketCounter(); + }, + isMonitoringWebsocket ? 5000 : null + ); useEffect(() => { loadJobEvents(); @@ -331,14 +343,15 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } } }); - interval.current = setInterval(() => monitorJobSocketCounter(), 5000); + setIsMonitoringWebsocket(true); } return function cleanup() { if (ws) { ws.close(); } - clearInterval(interval.current); + setIsMonitoringWebsocket(false); + isMounted.current = false; }; }, [location.search]); // eslint-disable-line react-hooks/exhaustive-deps @@ -346,7 +359,23 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (listRef.current?.recomputeRowHeights) { listRef.current.recomputeRowHeights(); } - }, [currentlyLoading, cssMap, remoteRowCount]); + }, [currentlyLoading.current, cssMap, remoteRowCount]); + + useEffect(() => { + if (jobStatus && !isJobRunning(jobStatus)) { + if (jobSocketCounter.current > remoteRowCount && isMounted.current) { + setRemoteRowCount(jobSocketCounter.current); + } + + if (isMonitoringWebsocket) { + setIsMonitoringWebsocket(false); + } + + if (isFollowModeEnabled) { + setTimeout(() => setIsFollowModeEnabled(false), 1000); + } + } + }, [jobStatus]); const { error: cancelError, @@ -382,14 +411,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } = useDismissableError(deleteError); const monitorJobSocketCounter = () => { + if (jobSocketCounter.current > remoteRowCount && isMounted.current) { + setRemoteRowCount(jobSocketCounter.current); + } if ( jobSocketCounter.current === remoteRowCount && !isJobRunning(job.status) ) { - clearInterval(interval.current); - } - if (jobSocketCounter.current > remoteRowCount && isMounted.current) { - setRemoteRowCount(jobSocketCounter.current); + setIsMonitoringWebsocket(false); } }; @@ -398,9 +427,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (isMounted.current) { setHasContentLoading(true); - setCurrentlyLoading(prevCurrentlyLoading => - prevCurrentlyLoading.concat(loadRange) - ); + currentlyLoading.current = currentlyLoading.current.concat(loadRange); } try { @@ -466,8 +493,8 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } finally { if (isMounted.current) { setHasContentLoading(false); - setCurrentlyLoading(prevCurrentlyLoading => - prevCurrentlyLoading.filter(n => !loadRange.includes(n)) + currentlyLoading.current = currentlyLoading.current.filter( + n => !loadRange.includes(n) ); loadRange.forEach(n => { cache.clear(n); @@ -480,7 +507,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (results[index]) { return true; } - return currentlyLoading.includes(index); + return currentlyLoading.current.includes(index); }; const handleHostEventClick = hostEventToOpen => { @@ -493,6 +520,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const rowRenderer = ({ index, parent, key, style }) => { + if (listRef.current && isFollowModeEnabled) { + setTimeout(() => scrollToRow(remoteRowCount - 1), 0); + } let actualLineTextHtml = []; if (results[index]) { const { lineTextHtml } = getLineTextHtml(results[index]); @@ -545,9 +575,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { ); if (isMounted.current) { - setCurrentlyLoading(prevCurrentlyLoading => - prevCurrentlyLoading.concat(loadRange) - ); + currentlyLoading.current = currentlyLoading.current.concat(loadRange); } const params = { @@ -574,8 +602,8 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { ...prevCssMap, ...newResultsCssMap, })); - setCurrentlyLoading(prevCurrentlyLoading => - prevCurrentlyLoading.filter(n => !loadRange.includes(n)) + currentlyLoading.current = currentlyLoading.current.filter( + n => !loadRange.includes(n) ); loadRange.forEach(n => { cache.clear(n); @@ -585,7 +613,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const scrollToRow = rowIndex => { - listRef.current.scrollToRow(rowIndex); + if (listRef.current) { + listRef.current.scrollToRow(rowIndex); + } }; const handleScrollPrevious = () => { @@ -659,25 +689,26 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const handleFollowToggle = () => { - if (isFollowEnabled) { + if (isFollowModeEnabled) { setIsFollowModeEnabled(false); } else { setIsFollowModeEnabled(true); scrollToRow(remoteRowCount - 1); } }; - const handleScroll = () => { - if (listRef?.current?.Grid?._renderedRowStopIndex < remoteRowCount - 1) { + + const handleScroll = e => { + if ( + isFollowModeEnabled && + scrollTop.current > e.scrollTop && + scrollHeight.current === e.scrollHeight + ) { setIsFollowModeEnabled(false); } + scrollTop.current = e.scrollTop; + scrollHeight.current = e.scrollHeight; }; - useEffect(() => { - if (isFollowEnabled) { - scrollToRow(remoteRowCount); - } - }, [remoteRowCount, isFollowEnabled]); - const renderSearchComponent = () => ( {isJobRunning(job.status) ? ( ) : null} @@ -800,7 +831,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { onScrollNext={handleScrollNext} onScrollPrevious={handleScrollPrevious} /> - + Date: Wed, 26 May 2021 12:59:09 -0400 Subject: [PATCH 19/20] Fix linting errors --- awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx index b439f86be9..9d4b61aa1f 100644 --- a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx +++ b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx @@ -359,7 +359,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (listRef.current?.recomputeRowHeights) { listRef.current.recomputeRowHeights(); } - }, [currentlyLoading.current, cssMap, remoteRowCount]); + }, [cssMap, remoteRowCount]); useEffect(() => { if (jobStatus && !isJobRunning(jobStatus)) { @@ -375,7 +375,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { setTimeout(() => setIsFollowModeEnabled(false), 1000); } } - }, [jobStatus]); + }, [jobStatus]); // eslint-disable-line react-hooks/exhaustive-deps const { error: cancelError, From 3cc6a4cf444b89d909a6cf241ae80aa75cc018be Mon Sep 17 00:00:00 2001 From: mabashian Date: Thu, 3 Jun 2021 09:29:12 -0400 Subject: [PATCH 20/20] Go back to tracking currentlyLoading via state and not ref --- .../src/screens/Job/JobOutput/JobOutput.jsx | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx index 9d4b61aa1f..527183f760 100644 --- a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx +++ b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx @@ -301,10 +301,10 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const isMounted = useIsMounted(); const scrollTop = useRef(0); const scrollHeight = useRef(0); - const currentlyLoading = useRef([]); const history = useHistory(); const [contentError, setContentError] = useState(null); const [cssMap, setCssMap] = useState({}); + const [currentlyLoading, setCurrentlyLoading] = useState([]); const [hasContentLoading, setHasContentLoading] = useState(true); const [hostEvent, setHostEvent] = useState({}); const [isHostModalOpen, setIsHostModalOpen] = useState(false); @@ -359,7 +359,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (listRef.current?.recomputeRowHeights) { listRef.current.recomputeRowHeights(); } - }, [cssMap, remoteRowCount]); + }, [currentlyLoading, cssMap, remoteRowCount]); useEffect(() => { if (jobStatus && !isJobRunning(jobStatus)) { @@ -427,7 +427,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (isMounted.current) { setHasContentLoading(true); - currentlyLoading.current = currentlyLoading.current.concat(loadRange); + setCurrentlyLoading(prevCurrentlyLoading => + prevCurrentlyLoading.concat(loadRange) + ); } try { @@ -493,8 +495,8 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } finally { if (isMounted.current) { setHasContentLoading(false); - currentlyLoading.current = currentlyLoading.current.filter( - n => !loadRange.includes(n) + setCurrentlyLoading(prevCurrentlyLoading => + prevCurrentlyLoading.filter(n => !loadRange.includes(n)) ); loadRange.forEach(n => { cache.clear(n); @@ -507,7 +509,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { if (results[index]) { return true; } - return currentlyLoading.current.includes(index); + return currentlyLoading.includes(index); }; const handleHostEventClick = hostEventToOpen => { @@ -575,7 +577,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { ); if (isMounted.current) { - currentlyLoading.current = currentlyLoading.current.concat(loadRange); + setCurrentlyLoading(prevCurrentlyLoading => + prevCurrentlyLoading.concat(loadRange) + ); } const params = { @@ -602,8 +606,8 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { ...prevCssMap, ...newResultsCssMap, })); - currentlyLoading.current = currentlyLoading.current.filter( - n => !loadRange.includes(n) + setCurrentlyLoading(prevCurrentlyLoading => + prevCurrentlyLoading.filter(n => !loadRange.includes(n)) ); loadRange.forEach(n => { cache.clear(n);