JobOutput: extract multiple helper functions

This commit is contained in:
Keith J. Grant 2021-08-17 16:20:54 -07:00
parent b5708a8cc4
commit b5bc9bb3f4
5 changed files with 191 additions and 175 deletions

View File

@ -29,8 +29,11 @@ import PageControls from './PageControls';
import HostEventModal from './HostEventModal';
import JobOutputSearch from './JobOutputSearch';
import { HostStatusBar, OutputToolbar } from './shared';
import getRowRangePageSize from './shared/jobOutputUtils';
import getLineTextHtml from './getLineTextHtml';
import connectJobSocket, { closeWebSocket } from './connectJobSocket';
import getEventRequestParams from './getEventRequestParams';
import isHostEvent from './isHostEvent';
import { fetchCount, normalizeEvents } from './loadJobEvents';
const QS_CONFIG = getQSConfig('job_output', {
order_by: 'counter',
@ -77,101 +80,11 @@ const OutputFooter = styled.div`
flex: 1;
`;
let ws;
function connectJobSocket({ type, id }, onMessage) {
ws = new WebSocket(
`${window.location.protocol === 'http:' ? 'ws:' : 'wss:'}//${
window.location.host
}/websocket/`
);
ws.onopen = () => {
const xrftoken = `; ${document.cookie}`
.split('; csrftoken=')
.pop()
.split(';')
.shift();
const eventGroup = `${type}_events`;
ws.send(
JSON.stringify({
xrftoken,
groups: { jobs: ['summary', 'status_changed'], [eventGroup]: [id] },
})
);
};
ws.onmessage = (e) => {
onMessage(JSON.parse(e.data));
};
ws.onclose = (e) => {
if (e.code !== 1000) {
// eslint-disable-next-line no-console
console.debug('Socket closed. Reconnecting...', e);
setTimeout(() => {
connectJobSocket({ type, id }, onMessage);
}, 1000);
}
};
ws.onerror = (err) => {
// eslint-disable-next-line no-console
console.debug('Socket error: ', err, 'Disconnecting...');
ws.close();
};
}
function range(low, high) {
const numbers = [];
for (let n = low; n <= high; n++) {
numbers.push(n);
}
return numbers;
}
function isHostEvent(jobEvent) {
const { event, event_data, host, type } = jobEvent;
let isHost;
if (typeof host === 'number' || (event_data && event_data.res)) {
isHost = true;
} else if (
type === 'project_update_event' &&
event !== 'runner_on_skipped' &&
event_data.host
) {
isHost = true;
} else {
isHost = false;
}
return isHost;
}
const cache = new CellMeasurerCache({
fixedWidth: true,
defaultHeight: 25,
});
const getEventRequestParams = (job, remoteRowCount, requestRange) => {
const [startIndex, stopIndex] = requestRange;
if (isJobRunning(job?.status)) {
return [
{ counter__gte: startIndex, limit: stopIndex - startIndex + 1 },
range(startIndex, Math.min(stopIndex, remoteRowCount)),
startIndex,
];
}
const { page, pageSize, firstIndex } = getRowRangePageSize(
startIndex,
stopIndex
);
const loadRange = range(
firstIndex,
Math.min(firstIndex + pageSize, remoteRowCount)
);
return [{ page, page_size: pageSize }, loadRange, firstIndex];
};
function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
const location = useLocation();
const listRef = useRef(null);
@ -226,9 +139,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
}
return function cleanup() {
if (ws) {
ws.close();
}
closeWebSocket();
setIsMonitoringWebsocket(false);
isMounted.current = false;
};
@ -312,73 +223,29 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
...parseQueryString(QS_CONFIG, location.search),
});
let countRequest;
if (isJobRunning(job?.status)) {
// If the job is running, it means we're using limit-offset pagination. Requests
// with limit-offset pagination won't return a total event count for performance
// reasons. In this situation, we derive the remote row count by using the highest
// counter available in the database.
countRequest = async () => {
const {
data: { results: lastEvents = [] },
} = await getJobModel(job.type).readEvents(job.id, {
order_by: '-counter',
limit: 1,
});
return lastEvents.length >= 1 ? lastEvents[0].counter : 0;
};
} else {
countRequest = async () => {
const {
data: { count: eventCount },
} = await eventPromise;
return eventCount;
};
}
try {
const [
{
data: { results: fetchedEvents = [] },
},
count,
] = await Promise.all([eventPromise, countRequest()]);
] = await Promise.all([eventPromise, fetchCount(job, eventPromise)]);
if (isMounted.current) {
let countOffset = 0;
if (job?.result_traceback) {
const tracebackEvent = {
counter: 1,
created: null,
event: null,
type: null,
stdout: job?.result_traceback,
start_line: 0,
};
const firstIndex = fetchedEvents.findIndex(
(jobEvent) => jobEvent.counter === 1
);
if (firstIndex && fetchedEvents[firstIndex]?.stdout) {
const stdoutLines = fetchedEvents[firstIndex].stdout.split('\r\n');
stdoutLines[0] = tracebackEvent.stdout;
fetchedEvents[firstIndex].stdout = stdoutLines.join('\r\n');
} else {
countOffset += 1;
fetchedEvents.unshift(tracebackEvent);
}
}
const newResults = {};
let newResultsCssMap = {};
fetchedEvents.forEach((jobEvent, index) => {
newResults[index] = jobEvent;
const { lineCssMap } = getLineTextHtml(jobEvent);
newResultsCssMap = { ...newResultsCssMap, ...lineCssMap };
});
setResults(newResults);
setRemoteRowCount(count + countOffset);
setCssMap(newResultsCssMap);
if (!isMounted.current) {
return;
}
const { events, countOffset } = normalizeEvents(job, fetchedEvents);
const newResults = {};
let newResultsCssMap = {};
events.forEach((jobEvent, index) => {
newResults[index] = jobEvent;
const { lineCssMap } = getLineTextHtml(jobEvent);
newResultsCssMap = { ...newResultsCssMap, ...lineCssMap };
});
setResults(newResults);
setRemoteRowCount(count + countOffset);
setCssMap(newResultsCssMap);
} catch (err) {
setContentError(err);
} finally {
@ -479,29 +346,31 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
return getJobModel(job.type)
.readEvents(job.id, params)
.then((response) => {
if (isMounted.current) {
const newResults = {};
let newResultsCssMap = {};
response.data.results.forEach((jobEvent, index) => {
newResults[firstIndex + index] = jobEvent;
const { lineCssMap } = getLineTextHtml(jobEvent);
newResultsCssMap = { ...newResultsCssMap, ...lineCssMap };
});
setResults((prevResults) => ({
...prevResults,
...newResults,
}));
setCssMap((prevCssMap) => ({
...prevCssMap,
...newResultsCssMap,
}));
setCurrentlyLoading((prevCurrentlyLoading) =>
prevCurrentlyLoading.filter((n) => !loadRange.includes(n))
);
loadRange.forEach((n) => {
cache.clear(n);
});
if (!isMounted.current) {
return;
}
const newResults = {};
let newResultsCssMap = {};
response.data.results.forEach((jobEvent, index) => {
newResults[firstIndex + index] = jobEvent;
const { lineCssMap } = getLineTextHtml(jobEvent);
newResultsCssMap = { ...newResultsCssMap, ...lineCssMap };
});
setResults((prevResults) => ({
...prevResults,
...newResults,
}));
setCssMap((prevCssMap) => ({
...prevCssMap,
...newResultsCssMap,
}));
setCurrentlyLoading((prevCurrentlyLoading) =>
prevCurrentlyLoading.filter((n) => !loadRange.includes(n))
);
loadRange.forEach((n) => {
cache.clear(n);
});
});
};

View File

@ -0,0 +1,50 @@
let ws;
export default function connectJobSocket({ type, id }, onMessage) {
ws = new WebSocket(
`${window.location.protocol === 'http:' ? 'ws:' : 'wss:'}//${
window.location.host
}/websocket/`
);
ws.onopen = () => {
const xrftoken = `; ${document.cookie}`
.split('; csrftoken=')
.pop()
.split(';')
.shift();
const eventGroup = `${type}_events`;
ws.send(
JSON.stringify({
xrftoken,
groups: { jobs: ['summary', 'status_changed'], [eventGroup]: [id] },
})
);
};
ws.onmessage = (e) => {
onMessage(JSON.parse(e.data));
};
ws.onclose = (e) => {
if (e.code !== 1000) {
// eslint-disable-next-line no-console
console.debug('Socket closed. Reconnecting...', e);
setTimeout(() => {
connectJobSocket({ type, id }, onMessage);
}, 1000);
}
};
ws.onerror = (err) => {
// eslint-disable-next-line no-console
console.debug('Socket error: ', err, 'Disconnecting...');
ws.close();
};
}
export function closeWebSocket() {
if (ws) {
ws.close();
}
}

View File

@ -0,0 +1,35 @@
import { isJobRunning } from 'util/jobs';
import getRowRangePageSize from './shared/jobOutputUtils';
export default function getEventRequestParams(
job,
remoteRowCount,
requestRange
) {
const [startIndex, stopIndex] = requestRange;
if (isJobRunning(job?.status)) {
return [
{ counter__gte: startIndex, limit: stopIndex - startIndex + 1 },
range(startIndex, Math.min(stopIndex, remoteRowCount)),
startIndex,
];
}
const { page, pageSize, firstIndex } = getRowRangePageSize(
startIndex,
stopIndex
);
const loadRange = range(
firstIndex,
Math.min(firstIndex + pageSize, remoteRowCount)
);
return [{ page, page_size: pageSize }, loadRange, firstIndex];
}
function range(low, high) {
const numbers = [];
for (let n = low; n <= high; n++) {
numbers.push(n);
}
return numbers;
}

View File

@ -0,0 +1,16 @@
export default function isHostEvent(jobEvent) {
const { event, event_data, host, type } = jobEvent;
let isHost;
if (typeof host === 'number' || (event_data && event_data.res)) {
isHost = true;
} else if (
type === 'project_update_event' &&
event !== 'runner_on_skipped' &&
event_data.host
) {
isHost = true;
} else {
isHost = false;
}
return isHost;
}

View File

@ -0,0 +1,46 @@
import { getJobModel, isJobRunning } from 'util/jobs';
export async function fetchCount(job, eventPromise) {
if (isJobRunning(job?.status)) {
const {
data: { results: lastEvents = [] },
} = await getJobModel(job.type).readEvents(job.id, {
order_by: '-counter',
limit: 1,
});
return lastEvents.length >= 1 ? lastEvents[0].counter : 0;
}
const {
data: { count: eventCount },
} = await eventPromise;
return eventCount;
}
export function normalizeEvents(job, events) {
let countOffset = 0;
if (job?.result_traceback) {
const tracebackEvent = {
counter: 1,
created: null,
event: null,
type: null,
stdout: job?.result_traceback,
start_line: 0,
};
const firstIndex = events.findIndex((jobEvent) => jobEvent.counter === 1);
if (firstIndex && events[firstIndex]?.stdout) {
const stdoutLines = events[firstIndex].stdout.split('\r\n');
stdoutLines[0] = tracebackEvent.stdout;
events[firstIndex].stdout = stdoutLines.join('\r\n');
} else {
countOffset += 1;
events.unshift(tracebackEvent);
}
}
return {
events,
countOffset,
};
}