add max event count and discarding to stream service

This commit is contained in:
Jake McDermott
2018-08-18 23:34:01 -04:00
parent 2187655c68
commit 38b9b47e6b
5 changed files with 137 additions and 64 deletions

View File

@@ -16,6 +16,7 @@ export const JOB_STATUS_FINISHED = JOB_STATUS_COMPLETE.concat(JOB_STATUS_INCOMPL
export const OUTPUT_ELEMENT_CONTAINER = '.at-Stdout-container'; export const OUTPUT_ELEMENT_CONTAINER = '.at-Stdout-container';
export const OUTPUT_ELEMENT_TBODY = '#atStdoutResultTable'; export const OUTPUT_ELEMENT_TBODY = '#atStdoutResultTable';
export const OUTPUT_ELEMENT_LAST = '#atStdoutMenuLast'; export const OUTPUT_ELEMENT_LAST = '#atStdoutMenuLast';
export const OUTPUT_MAX_BUFFER_LENGTH = 500;
export const OUTPUT_MAX_LAG = 120; export const OUTPUT_MAX_LAG = 120;
export const OUTPUT_NO_COUNT_JOB_TYPES = ['ad_hoc_command', 'system_job', 'inventory_update']; export const OUTPUT_NO_COUNT_JOB_TYPES = ['ad_hoc_command', 'system_job', 'inventory_update'];
export const OUTPUT_ORDER_BY = 'counter'; export const OUTPUT_ORDER_BY = 'counter';

View File

@@ -177,8 +177,7 @@ function canStartFollowing () {
if (followOnce && // one-time activation from top of first page if (followOnce && // one-time activation from top of first page
scroll.isBeyondUpperThreshold() && scroll.isBeyondUpperThreshold() &&
slide.getHeadCounter() === 1 && slide.getTailCounter() - slide.getHeadCounter() >= OUTPUT_PAGE_SIZE) {
slide.getTailCounter() >= OUTPUT_PAGE_SIZE) {
followOnce = false; followOnce = false;
return true; return true;
@@ -386,7 +385,7 @@ function startListening () {
function handleJobEvent (data) { function handleJobEvent (data) {
streaming = streaming || resource.events streaming = streaming || resource.events
.getRange([Math.max(0, data.counter - 50), data.counter + 50]) .getRange([Math.max(1, data.counter - 50), data.counter + 50])
.then(results => { .then(results => {
results.push(data); results.push(data);
@@ -406,12 +405,13 @@ function handleJobEvent (data) {
results = results.filter(({ counter }) => counter > maxMissing); results = results.filter(({ counter }) => counter > maxMissing);
} }
stream.setMissingCounterThreshold(max);
results.forEach(item => { results.forEach(item => {
stream.pushJobEvent(item); stream.pushJobEvent(item);
status.pushJobEvent(item); status.pushJobEvent(item);
}); });
stream.setMissingCounterThreshold(min);
return $q.resolve(); return $q.resolve();
}); });

View File

@@ -31,6 +31,8 @@ const pattern = [
const re = new RegExp(pattern); const re = new RegExp(pattern);
const hasAnsi = input => re.test(input); const hasAnsi = input => re.test(input);
const MISSING_EVENT_GROUP = 'MISSING_EVENT_GROUP';
function JobRenderService ($q, $sce, $window) { function JobRenderService ($q, $sce, $window) {
this.init = ({ compile, toggles }) => { this.init = ({ compile, toggles }) => {
this.parent = null; this.parent = null;
@@ -39,6 +41,7 @@ function JobRenderService ($q, $sce, $window) {
this.hooks = { compile }; this.hooks = { compile };
this.createToggles = toggles; this.createToggles = toggles;
this.lastMissing = false;
this.state = { this.state = {
collapseAll: false collapseAll: false
}; };
@@ -76,10 +79,21 @@ function JobRenderService ($q, $sce, $window) {
}; };
this.transformEvent = event => { this.transformEvent = event => {
if (this.record[event.uuid]) { if (event.uuid && this.record[event.uuid]) {
return { html: '', count: 0 }; return { html: '', count: 0 };
} }
if (event.event === MISSING_EVENT_GROUP) {
if (this.lastMissing) {
return { html: '', count: 0 };
}
this.lastMissing = true;
return this.transformMissingEvent(event);
}
this.lastMissing = false;
if (!event || !event.stdout) { if (!event || !event.stdout) {
return { html: '', count: 0 }; return { html: '', count: 0 };
} }
@@ -110,6 +124,13 @@ function JobRenderService ($q, $sce, $window) {
return { html, count }; return { html, count };
}; };
this.transformMissingEvent = () => {
const html = '<div class="at-Stdout-row"><div class="at-Stdout-line">...</div></div>';
const count = 1;
return { html, count };
};
this.isHostEvent = (event) => { this.isHostEvent = (event) => {
if (typeof event.host === 'number') { if (typeof event.host === 'number') {
return true; return true;

View File

@@ -75,7 +75,7 @@ function SlidingWindowService ($q) {
}; };
this.getBoundedRange = range => { this.getBoundedRange = range => {
const bounds = [1, this.getMaxCounter()]; const bounds = [0, this.getMaxCounter()];
return [Math.max(range[0], bounds[0]), Math.min(range[1], bounds[1])]; return [Math.max(range[0], bounds[0]), Math.min(range[1], bounds[1])];
}; };
@@ -388,6 +388,10 @@ function SlidingWindowService ($q) {
this.buffer.max = max; this.buffer.max = max;
this.buffer.count = count; this.buffer.count = count;
if (tail - head === 0) {
return frames;
}
if (min >= head && min <= tail + 1) { if (min >= head && min <= tail + 1) {
return frames.filter(({ counter }) => counter > tail); return frames.filter(({ counter }) => counter > tail);
} }
@@ -398,14 +402,21 @@ function SlidingWindowService ($q) {
this.getFrames = () => $q.resolve(this.buffer.events); this.getFrames = () => $q.resolve(this.buffer.events);
this.getMaxCounter = () => { this.getMaxCounter = () => {
if (this.buffer.min) { if (this.buffer.min && this.buffer.min > 1) {
return this.buffer.min; return this.buffer.min - 1;
} }
return this.api.getMaxCounter(); return this.api.getMaxCounter();
}; };
this.isOnLastPage = () => this.getTailCounter() >= (this.getMaxCounter() - OUTPUT_PAGE_SIZE); this.isOnLastPage = () => {
if (this.getTailCounter() === 0) {
return true;
}
return this.getTailCounter() >= (this.getMaxCounter() - OUTPUT_PAGE_SIZE);
};
this.getRange = () => [this.getHeadCounter(), this.getTailCounter()]; this.getRange = () => [this.getHeadCounter(), this.getTailCounter()];
this.getRecordCount = () => Object.keys(this.lines).length; this.getRecordCount = () => Object.keys(this.lines).length;
this.getCapacity = () => OUTPUT_EVENT_LIMIT - this.getRecordCount(); this.getCapacity = () => OUTPUT_EVENT_LIMIT - this.getRecordCount();

View File

@@ -1,6 +1,7 @@
/* eslint camelcase: 0 */ /* eslint camelcase: 0 */
import { import {
EVENT_STATS_PLAY, EVENT_STATS_PLAY,
OUTPUT_MAX_BUFFER_LENGTH,
OUTPUT_MAX_LAG, OUTPUT_MAX_LAG,
OUTPUT_PAGE_SIZE, OUTPUT_PAGE_SIZE,
} from './constants'; } from './constants';
@@ -15,16 +16,6 @@ function OutputStream ($q) {
onStop, onStop,
}; };
this.state = {
ending: false,
ended: false,
};
this.lag = 0;
this.chain = $q.resolve();
this.factors = this.calcFactors(OUTPUT_PAGE_SIZE);
this.setFramesPerRender();
this.bufferInit(); this.bufferInit();
}; };
@@ -32,34 +23,27 @@ function OutputStream ($q) {
rx.length = 0; rx.length = 0;
this.counters = { this.counters = {
total: 0, min: 1,
min: 0, max: -1,
max: null, ready: -1,
final: null, final: null,
ready: [],
used: [], used: [],
missing: [], missing: [],
total: 0,
length: 0,
}; };
};
this.bufferEmpty = (minReady, maxReady) => { this.state = {
let removed = []; ending: false,
ended: false,
overflow: false,
};
for (let i = rx.length - 1; i >= 0; i--) { this.lag = 0;
if (rx[i].counter <= maxReady) { this.chain = $q.resolve();
removed = removed.concat(rx.splice(i, 1));
}
}
return removed; this.factors = this.calcFactors(OUTPUT_PAGE_SIZE);
}; this.setFramesPerRender();
this.bufferAdd = event => {
rx.push(event);
this.counters.total += 1;
return this.counters.total;
}; };
this.calcFactors = size => { this.calcFactors = size => {
@@ -90,34 +74,88 @@ function OutputStream ($q) {
} }
}; };
this.checkCounter = ({ counter }) => { this.bufferAdd = event => {
this.counters.used.push(counter); const { counter } = event;
if (!this.counters.max || this.counters.max < counter) { if (counter > this.counters.max) {
this.counters.max = counter; this.counters.max = counter;
} }
let ready; let ready;
const used = [];
const missing = []; const missing = [];
for (let i = this.counters.min; i <= this.counters.max; i++) { for (let i = this.counters.min; i <= this.counters.max; i++) {
if (this.counters.used.indexOf(i) === -1) { if (this.counters.used.indexOf(i) === -1) {
missing.push(i); if (i === counter) {
rx.push(event);
used.push(i);
this.counters.length += 1;
} else {
missing.push(i);
}
} else {
used.push(i);
} }
} }
const excess = this.counters.length - OUTPUT_MAX_BUFFER_LENGTH;
this.state.overflow = (excess > 0);
if (missing.length === 0) { if (missing.length === 0) {
ready = this.counters.max; ready = this.counters.max;
} else if (this.state.overflow) {
const removed = Math.min(missing.length, excess);
ready = missing[removed - 1] - 1;
} else { } else {
ready = missing[0] - 1; ready = missing[0] - 1;
} }
this.counters.ready = [this.counters.min, ready]; this.counters.total += 1;
this.counters.min = ready + 1; this.counters.ready = ready;
this.counters.used = this.counters.used.filter(c => c > ready); this.counters.used = used;
this.counters.missing = missing; this.counters.missing = missing;
};
return this.counters.ready; this.bufferEmpty = threshold => {
let removed = [];
for (let i = rx.length - 1; i >= 0; i--) {
if (rx[i].counter <= threshold) {
removed = removed.concat(rx.splice(i, 1));
}
}
this.counters.min = threshold + 1;
this.counters.used = this.counters.used.filter(c => c > threshold);
this.counters.length = rx.length;
return removed;
};
this.isReadyToRender = () => {
const { total } = this.counters;
const readyCount = this.counters.ready - this.counters.min;
if (readyCount <= 0) {
return false;
}
if (this.state.ending) {
return true;
}
if (total % this.framesPerRender === 0) {
return true;
}
if (total < OUTPUT_PAGE_SIZE) {
if (readyCount % this.framesPerRender === 0) {
return true;
}
}
return false;
}; };
this.pushJobEvent = data => { this.pushJobEvent = data => {
@@ -130,25 +168,24 @@ function OutputStream ($q) {
this.counters.final = data.counter; this.counters.final = data.counter;
} }
const [minReady, maxReady] = this.checkCounter(data); this.bufferAdd(data);
const count = this.bufferAdd(data);
if (count % OUTPUT_PAGE_SIZE === 0) { if (this.counters.total % OUTPUT_PAGE_SIZE === 0) {
this.setFramesPerRender(); this.setFramesPerRender();
} }
const isReady = maxReady && (this.state.ending || if (!this.isReadyToRender()) {
count % this.framesPerRender === 0 ||
count < OUTPUT_PAGE_SIZE && (maxReady - minReady) % this.framesPerRender === 0);
if (!isReady) {
return $q.resolve(); return $q.resolve();
} }
const isLastFrame = this.state.ending && (maxReady >= this.counters.final); const isLast = this.state.ending && (this.counters.ready >= this.counters.final);
const events = this.bufferEmpty(minReady, maxReady); const events = this.bufferEmpty(this.counters.ready);
return this.emitFrames(events, isLastFrame); if (events.length > 0) {
return this.emitFrames(events, isLast);
}
return $q.resolve();
}) })
.then(() => --this.lag); .then(() => --this.lag);
@@ -161,16 +198,20 @@ function OutputStream ($q) {
this.state.ending = true; this.state.ending = true;
this.counters.final = counter; this.counters.final = counter;
if (counter >= this.counters.min) { if (counter > this.counters.ready) {
return $q.resolve(); return $q.resolve();
} }
const readyCount = this.counters.ready - this.counters.min;
let events = []; let events = [];
if (this.counters.ready.length > 0) { if (readyCount > 0) {
events = this.bufferEmpty(...this.counters.ready); events = this.bufferEmpty(this.counters.ready);
return this.emitFrames(events, true);
} }
return this.emitFrames(events, true); return $q.resolve();
}); });
return this.chain; return this.chain;
@@ -185,7 +226,6 @@ function OutputStream ($q) {
this.hooks.onStop(); this.hooks.onStop();
} }
this.counters.ready.length = 0;
return $q.resolve(); return $q.resolve();
}); });