Merge pull request #10053 from AlanCoding/dropsies

Intentionally drop job event websocket messages in excess of 30 per second (configurable)

SUMMARY
The UI no longer follows the latest job events from websocket messages. Because of that, there's no reason to send messages for all events if the job event rate is high.
I used 30 because this is the number of events that I guesstimate will show in one page in the UI.
Needs the setting added in the UI.
This adds skip_websocket_message to event event_data. We could promote it to a top-level key for job events, if that is preferable aesthetically. Doing this allows us to test this feature without having to connect a websocket client. Ping @mabashian @chrismeyersfsu
ISSUE TYPE

Feature Pull Request

COMPONENT NAME

API
UI

ADDITIONAL INFORMATION
Scenario walkthrough:
a job is producing 1,000 events per second. User launches it, the screen fills up in, say 1/4 of a second. The scrollbar indicates content beyond the bottom of the screen. Now, for 3/4ths of a second, the scrollbar stays still. After that, it updates the scrollbar to the current line number that the job is on. The scrollbar continues to update the length of the output effectively once per second.

Reviewed-by: Alan Rominger <arominge@redhat.com>
Reviewed-by: Chris Meyers <None>
Reviewed-by: Jake McDermott <yo@jakemcdermott.me>
This commit is contained in:
softwarefactory-project-zuul[bot] 2021-06-08 20:10:45 +00:00 committed by GitHub
commit 3340ef9c91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 136 additions and 17 deletions

View File

@ -362,6 +362,17 @@ register(
category_slug='jobs',
)
register(
'MAX_WEBSOCKET_EVENT_RATE',
field_class=fields.IntegerField,
min_value=0,
default=30,
label=_('Job Event Maximum Websocket Messages Per Second'),
help_text=_('Maximum number of messages to update the UI live job output with per second. Value of 0 means no limit.'),
category=_('Jobs'),
category_slug='jobs',
)
register(
'SCHEDULE_MAX_JOBS',
field_class=fields.IntegerField,

View File

@ -41,6 +41,7 @@ STANDARD_INVENTORY_UPDATE_ENV = {
}
CAN_CANCEL = ('new', 'pending', 'waiting', 'running')
ACTIVE_STATES = CAN_CANCEL
MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])
CENSOR_VALUE = '************'
ENV_BLOCKLIST = frozenset(
(

View File

@ -142,7 +142,8 @@ class CallbackBrokerWorker(BaseWorker):
logger.exception('Database Error Saving Job Event')
duration_to_save = time.perf_counter() - duration_to_save
for e in events:
emit_event_detail(e)
if not getattr(e, '_skip_websocket_message', False):
emit_event_detail(e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
@ -207,7 +208,13 @@ class CallbackBrokerWorker(BaseWorker):
GuidMiddleware.set_guid('')
return
skip_websocket_message = body.pop('skip_websocket_message', False)
event = cls.create_from_data(**body)
if skip_websocket_message:
event._skip_websocket_message = True
self.buff.setdefault(cls, []).append(event)
retries = 0

View File

@ -17,6 +17,7 @@ from awx.api.versioning import reverse
from awx.main import consumers
from awx.main.managers import DeferJobCreatedManager
from awx.main.fields import JSONField
from awx.main.constants import MINIMAL_EVENTS
from awx.main.models.base import CreatedModifiedModel
from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore
@ -57,9 +58,6 @@ def create_host_status_counts(event_data):
return dict(host_status_counts)
MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])
def emit_event_detail(event):
if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS:
return

View File

@ -4,7 +4,7 @@
# All Rights Reserved.
# Python
from collections import OrderedDict, namedtuple
from collections import OrderedDict, namedtuple, deque
import errno
import functools
import importlib
@ -57,7 +57,7 @@ from receptorctl.socket_interface import ReceptorControl
# AWX
from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS
from awx.main.access import access_registry
from awx.main.redact import UriCleaner
from awx.main.models import (
@ -740,6 +740,7 @@ class BaseTask(object):
self.host_map = {}
self.guid = GuidMiddleware.get_guid()
self.job_created = None
self.recent_event_timings = deque(maxlen=settings.MAX_WEBSOCKET_EVENT_RATE)
def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
@ -1151,6 +1152,37 @@ class BaseTask(object):
if 'event_data' in event_data:
event_data['event_data']['guid'] = self.guid
# To prevent overwhelming the broadcast queue, skip some websocket messages
if self.recent_event_timings:
cpu_time = time.time()
first_window_time = self.recent_event_timings[0]
last_window_time = self.recent_event_timings[-1]
if event_data.get('event') in MINIMAL_EVENTS:
should_emit = True # always send some types like playbook_on_stats
elif event_data.get('stdout') == '' and event_data['start_line'] == event_data['end_line']:
should_emit = False # exclude events with no output
else:
should_emit = any(
[
# if 30the most recent websocket message was sent over 1 second ago
cpu_time - first_window_time > 1.0,
# if the very last websocket message came in over 1/30 seconds ago
self.recent_event_timings.maxlen * (cpu_time - last_window_time) > 1.0,
# if the queue is not yet full
len(self.recent_event_timings) != self.recent_event_timings.maxlen,
]
)
if should_emit:
self.recent_event_timings.append(cpu_time)
else:
event_data.setdefault('event_data', {})
event_data['skip_websocket_message'] = True
elif self.recent_event_timings.maxlen:
self.recent_event_timings.append(time.time())
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
self.event_ct += 1

View File

@ -197,8 +197,9 @@ UI_LIVE_UPDATES_ENABLED = True
# beyond this limit and the value will be removed
MAX_EVENT_RES_DATA = 700000
# Note: This setting may be overridden by database settings.
# Note: These settings may be overridden by database settings.
EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024
MAX_WEBSOCKET_EVENT_RATE = 30
# The amount of time before a stdout file is expired and removed locally
# Note that this can be recreated if the stdout is downloaded

View File

@ -38,6 +38,7 @@ import { HostStatusBar, OutputToolbar } from './shared';
import getRowRangePageSize from './shared/jobOutputUtils';
import { getJobModel, isJobRunning } from '../../../util/jobs';
import useRequest, { useDismissableError } from '../../../util/useRequest';
import useInterval from '../../../util/useInterval';
import {
parseQueryString,
mergeParams,
@ -297,8 +298,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
const listRef = useRef(null);
const previousWidth = useRef(0);
const jobSocketCounter = useRef(0);
const interval = useRef(null);
const isMounted = useIsMounted();
const scrollTop = useRef(0);
const scrollHeight = useRef(0);
const history = useHistory();
const [contentError, setContentError] = useState(null);
const [cssMap, setCssMap] = useState({});
@ -310,6 +312,17 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
const [showCancelModal, setShowCancelModal] = useState(false);
const [remoteRowCount, setRemoteRowCount] = useState(0);
const [results, setResults] = useState({});
const [isFollowModeEnabled, setIsFollowModeEnabled] = useState(
isJobRunning(job.status)
);
const [isMonitoringWebsocket, setIsMonitoringWebsocket] = useState(false);
useInterval(
() => {
monitorJobSocketCounter();
},
isMonitoringWebsocket ? 5000 : null
);
useEffect(() => {
loadJobEvents();
@ -330,14 +343,15 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
}
}
});
interval.current = setInterval(() => monitorJobSocketCounter(), 5000);
setIsMonitoringWebsocket(true);
}
return function cleanup() {
if (ws) {
ws.close();
}
clearInterval(interval.current);
setIsMonitoringWebsocket(false);
isMounted.current = false;
};
}, [location.search]); // eslint-disable-line react-hooks/exhaustive-deps
@ -347,6 +361,22 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
}
}, [currentlyLoading, cssMap, remoteRowCount]);
useEffect(() => {
if (jobStatus && !isJobRunning(jobStatus)) {
if (jobSocketCounter.current > remoteRowCount && isMounted.current) {
setRemoteRowCount(jobSocketCounter.current);
}
if (isMonitoringWebsocket) {
setIsMonitoringWebsocket(false);
}
if (isFollowModeEnabled) {
setTimeout(() => setIsFollowModeEnabled(false), 1000);
}
}
}, [jobStatus]); // eslint-disable-line react-hooks/exhaustive-deps
const {
error: cancelError,
isLoading: isCancelling,
@ -381,14 +411,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
} = useDismissableError(deleteError);
const monitorJobSocketCounter = () => {
if (jobSocketCounter.current > remoteRowCount && isMounted.current) {
setRemoteRowCount(jobSocketCounter.current);
}
if (
jobSocketCounter.current === remoteRowCount &&
!isJobRunning(job.status)
) {
clearInterval(interval.current);
}
if (jobSocketCounter.current > remoteRowCount && isMounted.current) {
setRemoteRowCount(jobSocketCounter.current);
setIsMonitoringWebsocket(false);
}
};
@ -492,6 +522,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
};
const rowRenderer = ({ index, parent, key, style }) => {
if (listRef.current && isFollowModeEnabled) {
setTimeout(() => scrollToRow(remoteRowCount - 1), 0);
}
let actualLineTextHtml = [];
if (results[index]) {
const { lineTextHtml } = getLineTextHtml(results[index]);
@ -584,7 +617,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
};
const scrollToRow = rowIndex => {
listRef.current.scrollToRow(rowIndex);
if (listRef.current) {
listRef.current.scrollToRow(rowIndex);
}
};
const handleScrollPrevious = () => {
@ -604,7 +639,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
};
const handleScrollLast = () => {
scrollToRow(remoteRowCount);
scrollToRow(remoteRowCount - 1);
};
const handleResize = ({ width }) => {
@ -657,6 +692,27 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
history.push(qs ? `${pathname}?${qs}` : pathname);
};
const handleFollowToggle = () => {
if (isFollowModeEnabled) {
setIsFollowModeEnabled(false);
} else {
setIsFollowModeEnabled(true);
scrollToRow(remoteRowCount - 1);
}
};
const handleScroll = e => {
if (
isFollowModeEnabled &&
scrollTop.current > e.scrollTop &&
scrollHeight.current === e.scrollHeight
) {
setIsFollowModeEnabled(false);
}
scrollTop.current = e.scrollTop;
scrollHeight.current = e.scrollHeight;
};
const renderSearchComponent = () => (
<Search
qsConfig={QS_CONFIG}
@ -763,6 +819,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
)}
</ToolbarItem>
</ToolbarToggleGroup>
{isJobRunning(job.status) ? (
<Button
variant={isFollowModeEnabled ? 'secondary' : 'primary'}
onClick={handleFollowToggle}
>
{isFollowModeEnabled ? t`Unfollow` : t`Follow`}
</Button>
) : null}
</SearchToolbarContent>
</SearchToolbar>
<PageControls
@ -801,6 +865,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
scrollToAlignment="start"
width={width || 1}
overscanRowCount={20}
onScroll={handleScroll}
/>
)}
</>

View File

@ -3,6 +3,7 @@ import logging
import atexit
import json
import ssl
import datetime
from queue import Queue, Empty
from urllib.parse import urlparse
@ -50,7 +51,7 @@ class WSClient(object):
# Subscription group types
def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None):
def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None, add_received_time=False):
# delay this import, because this is an optional dependency
import websocket
@ -90,6 +91,7 @@ class WSClient(object):
self._message_cache = []
self._should_subscribe_to_pending_job = False
self._pending_unsubscribe = threading.Event()
self._add_received_time = add_received_time
def connect(self):
wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE}))
@ -195,6 +197,8 @@ class WSClient(object):
def _on_message(self, message):
message = json.loads(message)
log.debug('received message: {}'.format(message))
if self._add_received_time:
message['received_time'] = datetime.datetime.utcnow()
if all([message.get('group_name') == 'jobs', message.get('status') == 'pending', message.get('unified_job_id'), self._should_subscribe_to_pending_job]):
if bool(message.get('project_id')) == (self._should_subscribe_to_pending_job['events'] == 'project_update_events'):