From 6a59356200dd144d874206e389003dc1cf4b077a Mon Sep 17 00:00:00 2001 From: Jake McDermott Date: Thu, 7 Jun 2018 18:51:09 -0400 Subject: [PATCH] apply a windowing function to buffered event stream --- .../features/output/index.controller.js | 18 ++++++-- .../client/features/output/slide.service.js | 9 ++-- .../client/features/output/stream.service.js | 43 +++++++++---------- 3 files changed, 41 insertions(+), 29 deletions(-) diff --git a/awx/ui/client/features/output/index.controller.js b/awx/ui/client/features/output/index.controller.js index 6fc0746571..64d53b4faa 100644 --- a/awx/ui/client/features/output/index.controller.js +++ b/awx/ui/client/features/output/index.controller.js @@ -32,13 +32,23 @@ function bufferAdd (event) { bufferState[0] += 1; bufferState[1] += 1; - return bufferState; + return bufferState[1]; } -function bufferEmpty () { - bufferState[0] = 0; +function bufferEmpty (min, max) { + let count = 0; + let removed = []; - return rx.splice(0, rx.length); + for (let i = bufferState[0] - 1; i >= 0; i--) { + if (rx[i].counter <= max) { + removed = removed.concat(rx.splice(i, 1)); + count++; + } + } + + bufferState[0] -= count; + + return removed; } function onFrames (events) { diff --git a/awx/ui/client/features/output/slide.service.js b/awx/ui/client/features/output/slide.service.js index 96b32b2703..507a277733 100644 --- a/awx/ui/client/features/output/slide.service.js +++ b/awx/ui/client/features/output/slide.service.js @@ -309,10 +309,13 @@ function SlidingWindowService ($q) { return Number.isFinite(head) ? head : 0; }; - this.compareRange = (a, b) => a[0] === b[0] && a[1] === b[1]; - this.getRange = () => [this.getHeadCounter(), this.getTailCounter()]; + this.getMaxCounter = () => { + const counter = this.api.getMaxCounter(); - this.getMaxCounter = () => this.api.getMaxCounter(); + return Number.isFinite(counter) ? counter : this.getTailCounter(); + }; + + this.getRange = () => [this.getHeadCounter(), this.getTailCounter()]; this.getRecordCount = () => Object.keys(this.records).length; this.getCapacity = () => EVENT_LIMIT - this.getRecordCount(); } diff --git a/awx/ui/client/features/output/stream.service.js b/awx/ui/client/features/output/stream.service.js index da73f70c23..c710a1fc1a 100644 --- a/awx/ui/client/features/output/stream.service.js +++ b/awx/ui/client/features/output/stream.service.js @@ -16,6 +16,7 @@ function OutputStream ($q) { used: [], min: 1, max: 0, + last: null, ready: false, }; @@ -66,28 +67,30 @@ function OutputStream ($q) { } const missing = []; - const ready = []; + let minReady; + let maxReady; - for (let i = this.counters.min; i < this.counters.max; i++) { + for (let i = this.counters.min; i <= this.counters.max; i++) { if (this.counters.used.indexOf(i) === -1) { missing.push(i); } else if (missing.length === 0) { - ready.push(i); + maxReady = i; } } - if (missing.length === 0) { + if (maxReady) { + minReady = this.counters.min; + this.counters.ready = true; - this.counters.min = this.counters.max + 1; - this.counters.used = []; + this.counters.min = maxReady + 1; + this.counters.used = this.counters.used.filter(c => c > maxReady); } else { this.counters.ready = false; } this.counters.missing = missing; - this.counters.readyLines = ready; - return this.counters.ready; + return [minReady, maxReady]; }; this.pushJobEvent = data => { @@ -97,40 +100,36 @@ function OutputStream ($q) { .then(() => { if (data.event === JOB_END) { this.state.ending = true; + this.counters.last = data.counter; } - const isMissingCounters = !this.updateCounterState(data); - const [length, count] = this.hooks.bufferAdd(data); + const [minReady, maxReady] = this.updateCounterState(data); + const count = this.hooks.bufferAdd(data); if (count % PAGE_SIZE === 0) { this.setFramesPerRender(); } - const isBatchReady = length % this.framesPerRender === 0; - const isReady = this.state.ended || (!isMissingCounters && isBatchReady); + const isReady = maxReady && (this.state.ending || + (maxReady - minReady) % this.framesPerRender === 0); if (!isReady) { return $q.resolve(); } - const events = this.hooks.bufferEmpty(); + const isLastFrame = this.state.ending && (maxReady >= this.counters.last); + const events = this.hooks.bufferEmpty(minReady, maxReady); - return this.emitFrames(events); + return this.emitFrames(events, isLastFrame); }) .then(() => --this.lag); return this.chain; }; - this.emitFrames = events => this.hooks.onFrames(events) + this.emitFrames = (events, last) => this.hooks.onFrames(events) .then(() => { - if (this.state.ending) { - const lastEvents = this.hooks.bufferEmpty(); - - if (lastEvents.length) { - return this.emitFrames(lastEvents); - } - + if (last) { this.state.ending = false; this.state.ended = true;