diff --git a/awx/main/conf.py b/awx/main/conf.py index c121dcda51..7d2f6f979a 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -362,6 +362,17 @@ register( category_slug='jobs', ) +register( + 'MAX_WEBSOCKET_EVENT_RATE', + field_class=fields.IntegerField, + min_value=0, + default=30, + label=_('Job Event Maximum Websocket Messages Per Second'), + help_text=_('Maximum number of messages to update the UI live job output with per second. Value of 0 means no limit.'), + category=_('Jobs'), + category_slug='jobs', +) + register( 'SCHEDULE_MAX_JOBS', field_class=fields.IntegerField, diff --git a/awx/main/constants.py b/awx/main/constants.py index b2d62d5ec9..fbd783b968 100644 --- a/awx/main/constants.py +++ b/awx/main/constants.py @@ -41,6 +41,7 @@ STANDARD_INVENTORY_UPDATE_ENV = { } CAN_CANCEL = ('new', 'pending', 'waiting', 'running') ACTIVE_STATES = CAN_CANCEL +MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) CENSOR_VALUE = '************' ENV_BLOCKLIST = frozenset( ( diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index acfb0bce02..279db49bfb 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -142,7 +142,8 @@ class CallbackBrokerWorker(BaseWorker): logger.exception('Database Error Saving Job Event') duration_to_save = time.perf_counter() - duration_to_save for e in events: - emit_event_detail(e) + if not getattr(e, '_skip_websocket_message', False): + emit_event_detail(e) self.buff = {} self.last_flush = time.time() # only update metrics if we saved events @@ -207,7 +208,13 @@ class CallbackBrokerWorker(BaseWorker): GuidMiddleware.set_guid('') return + skip_websocket_message = body.pop('skip_websocket_message', False) + event = cls.create_from_data(**body) + + if skip_websocket_message: + event._skip_websocket_message = True + self.buff.setdefault(cls, []).append(event) retries = 0 diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 222eb22438..0a3cef78d1 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -17,6 +17,7 @@ from awx.api.versioning import reverse from awx.main import consumers from awx.main.managers import DeferJobCreatedManager from awx.main.fields import JSONField +from awx.main.constants import MINIMAL_EVENTS from awx.main.models.base import CreatedModifiedModel from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore @@ -57,9 +58,6 @@ def create_host_status_counts(event_data): return dict(host_status_counts) -MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) - - def emit_event_detail(event): if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS: return diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 317c3d6111..b1e41fbe52 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -4,7 +4,7 @@ # All Rights Reserved. # Python -from collections import OrderedDict, namedtuple +from collections import OrderedDict, namedtuple, deque import errno import functools import importlib @@ -57,7 +57,7 @@ from receptorctl.socket_interface import ReceptorControl # AWX from awx import __version__ as awx_application_version -from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV +from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS from awx.main.access import access_registry from awx.main.redact import UriCleaner from awx.main.models import ( @@ -740,6 +740,7 @@ class BaseTask(object): self.host_map = {} self.guid = GuidMiddleware.get_guid() self.job_created = None + self.recent_event_timings = deque(maxlen=settings.MAX_WEBSOCKET_EVENT_RATE) def update_model(self, pk, _attempt=0, **updates): """Reload the model instance from the database and update the @@ -1151,6 +1152,37 @@ class BaseTask(object): if 'event_data' in event_data: event_data['event_data']['guid'] = self.guid + # To prevent overwhelming the broadcast queue, skip some websocket messages + if self.recent_event_timings: + cpu_time = time.time() + first_window_time = self.recent_event_timings[0] + last_window_time = self.recent_event_timings[-1] + + if event_data.get('event') in MINIMAL_EVENTS: + should_emit = True # always send some types like playbook_on_stats + elif event_data.get('stdout') == '' and event_data['start_line'] == event_data['end_line']: + should_emit = False # exclude events with no output + else: + should_emit = any( + [ + # if 30the most recent websocket message was sent over 1 second ago + cpu_time - first_window_time > 1.0, + # if the very last websocket message came in over 1/30 seconds ago + self.recent_event_timings.maxlen * (cpu_time - last_window_time) > 1.0, + # if the queue is not yet full + len(self.recent_event_timings) != self.recent_event_timings.maxlen, + ] + ) + + if should_emit: + self.recent_event_timings.append(cpu_time) + else: + event_data.setdefault('event_data', {}) + event_data['skip_websocket_message'] = True + + elif self.recent_event_timings.maxlen: + self.recent_event_timings.append(time.time()) + event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) self.event_ct += 1 diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 98561c98fa..6d7a978a06 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -197,8 +197,9 @@ UI_LIVE_UPDATES_ENABLED = True # beyond this limit and the value will be removed MAX_EVENT_RES_DATA = 700000 -# Note: This setting may be overridden by database settings. +# Note: These settings may be overridden by database settings. EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 +MAX_WEBSOCKET_EVENT_RATE = 30 # The amount of time before a stdout file is expired and removed locally # Note that this can be recreated if the stdout is downloaded diff --git a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx index 976952afc7..527183f760 100644 --- a/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx +++ b/awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx @@ -38,6 +38,7 @@ import { HostStatusBar, OutputToolbar } from './shared'; import getRowRangePageSize from './shared/jobOutputUtils'; import { getJobModel, isJobRunning } from '../../../util/jobs'; import useRequest, { useDismissableError } from '../../../util/useRequest'; +import useInterval from '../../../util/useInterval'; import { parseQueryString, mergeParams, @@ -297,8 +298,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const listRef = useRef(null); const previousWidth = useRef(0); const jobSocketCounter = useRef(0); - const interval = useRef(null); const isMounted = useIsMounted(); + const scrollTop = useRef(0); + const scrollHeight = useRef(0); const history = useHistory(); const [contentError, setContentError] = useState(null); const [cssMap, setCssMap] = useState({}); @@ -310,6 +312,17 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { const [showCancelModal, setShowCancelModal] = useState(false); const [remoteRowCount, setRemoteRowCount] = useState(0); const [results, setResults] = useState({}); + const [isFollowModeEnabled, setIsFollowModeEnabled] = useState( + isJobRunning(job.status) + ); + const [isMonitoringWebsocket, setIsMonitoringWebsocket] = useState(false); + + useInterval( + () => { + monitorJobSocketCounter(); + }, + isMonitoringWebsocket ? 5000 : null + ); useEffect(() => { loadJobEvents(); @@ -330,14 +343,15 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } } }); - interval.current = setInterval(() => monitorJobSocketCounter(), 5000); + setIsMonitoringWebsocket(true); } return function cleanup() { if (ws) { ws.close(); } - clearInterval(interval.current); + setIsMonitoringWebsocket(false); + isMounted.current = false; }; }, [location.search]); // eslint-disable-line react-hooks/exhaustive-deps @@ -347,6 +361,22 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } }, [currentlyLoading, cssMap, remoteRowCount]); + useEffect(() => { + if (jobStatus && !isJobRunning(jobStatus)) { + if (jobSocketCounter.current > remoteRowCount && isMounted.current) { + setRemoteRowCount(jobSocketCounter.current); + } + + if (isMonitoringWebsocket) { + setIsMonitoringWebsocket(false); + } + + if (isFollowModeEnabled) { + setTimeout(() => setIsFollowModeEnabled(false), 1000); + } + } + }, [jobStatus]); // eslint-disable-line react-hooks/exhaustive-deps + const { error: cancelError, isLoading: isCancelling, @@ -381,14 +411,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { } = useDismissableError(deleteError); const monitorJobSocketCounter = () => { + if (jobSocketCounter.current > remoteRowCount && isMounted.current) { + setRemoteRowCount(jobSocketCounter.current); + } if ( jobSocketCounter.current === remoteRowCount && !isJobRunning(job.status) ) { - clearInterval(interval.current); - } - if (jobSocketCounter.current > remoteRowCount && isMounted.current) { - setRemoteRowCount(jobSocketCounter.current); + setIsMonitoringWebsocket(false); } }; @@ -492,6 +522,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const rowRenderer = ({ index, parent, key, style }) => { + if (listRef.current && isFollowModeEnabled) { + setTimeout(() => scrollToRow(remoteRowCount - 1), 0); + } let actualLineTextHtml = []; if (results[index]) { const { lineTextHtml } = getLineTextHtml(results[index]); @@ -584,7 +617,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const scrollToRow = rowIndex => { - listRef.current.scrollToRow(rowIndex); + if (listRef.current) { + listRef.current.scrollToRow(rowIndex); + } }; const handleScrollPrevious = () => { @@ -604,7 +639,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { }; const handleScrollLast = () => { - scrollToRow(remoteRowCount); + scrollToRow(remoteRowCount - 1); }; const handleResize = ({ width }) => { @@ -657,6 +692,27 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { history.push(qs ? `${pathname}?${qs}` : pathname); }; + const handleFollowToggle = () => { + if (isFollowModeEnabled) { + setIsFollowModeEnabled(false); + } else { + setIsFollowModeEnabled(true); + scrollToRow(remoteRowCount - 1); + } + }; + + const handleScroll = e => { + if ( + isFollowModeEnabled && + scrollTop.current > e.scrollTop && + scrollHeight.current === e.scrollHeight + ) { + setIsFollowModeEnabled(false); + } + scrollTop.current = e.scrollTop; + scrollHeight.current = e.scrollHeight; + }; + const renderSearchComponent = () => ( + {isJobRunning(job.status) ? ( + + ) : null} )} diff --git a/awxkit/awxkit/ws.py b/awxkit/awxkit/ws.py index 41380d406e..d56fccf719 100644 --- a/awxkit/awxkit/ws.py +++ b/awxkit/awxkit/ws.py @@ -3,6 +3,7 @@ import logging import atexit import json import ssl +import datetime from queue import Queue, Empty from urllib.parse import urlparse @@ -50,7 +51,7 @@ class WSClient(object): # Subscription group types - def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None): + def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None, add_received_time=False): # delay this import, because this is an optional dependency import websocket @@ -90,6 +91,7 @@ class WSClient(object): self._message_cache = [] self._should_subscribe_to_pending_job = False self._pending_unsubscribe = threading.Event() + self._add_received_time = add_received_time def connect(self): wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE})) @@ -195,6 +197,8 @@ class WSClient(object): def _on_message(self, message): message = json.loads(message) log.debug('received message: {}'.format(message)) + if self._add_received_time: + message['received_time'] = datetime.datetime.utcnow() if all([message.get('group_name') == 'jobs', message.get('status') == 'pending', message.get('unified_job_id'), self._should_subscribe_to_pending_job]): if bool(message.get('project_id')) == (self._should_subscribe_to_pending_job['events'] == 'project_update_events'):