apply a windowing function to buffered event stream

This commit is contained in:
Jake McDermott
2018-06-07 18:51:09 -04:00
parent 18384bc509
commit 6a59356200
3 changed files with 41 additions and 29 deletions

View File

@@ -16,6 +16,7 @@ function OutputStream ($q) {
used: [],
min: 1,
max: 0,
last: null,
ready: false,
};
@@ -66,28 +67,30 @@ function OutputStream ($q) {
}
const missing = [];
const ready = [];
let minReady;
let maxReady;
for (let i = this.counters.min; i < this.counters.max; i++) {
for (let i = this.counters.min; i <= this.counters.max; i++) {
if (this.counters.used.indexOf(i) === -1) {
missing.push(i);
} else if (missing.length === 0) {
ready.push(i);
maxReady = i;
}
}
if (missing.length === 0) {
if (maxReady) {
minReady = this.counters.min;
this.counters.ready = true;
this.counters.min = this.counters.max + 1;
this.counters.used = [];
this.counters.min = maxReady + 1;
this.counters.used = this.counters.used.filter(c => c > maxReady);
} else {
this.counters.ready = false;
}
this.counters.missing = missing;
this.counters.readyLines = ready;
return this.counters.ready;
return [minReady, maxReady];
};
this.pushJobEvent = data => {
@@ -97,40 +100,36 @@ function OutputStream ($q) {
.then(() => {
if (data.event === JOB_END) {
this.state.ending = true;
this.counters.last = data.counter;
}
const isMissingCounters = !this.updateCounterState(data);
const [length, count] = this.hooks.bufferAdd(data);
const [minReady, maxReady] = this.updateCounterState(data);
const count = this.hooks.bufferAdd(data);
if (count % PAGE_SIZE === 0) {
this.setFramesPerRender();
}
const isBatchReady = length % this.framesPerRender === 0;
const isReady = this.state.ended || (!isMissingCounters && isBatchReady);
const isReady = maxReady && (this.state.ending ||
(maxReady - minReady) % this.framesPerRender === 0);
if (!isReady) {
return $q.resolve();
}
const events = this.hooks.bufferEmpty();
const isLastFrame = this.state.ending && (maxReady >= this.counters.last);
const events = this.hooks.bufferEmpty(minReady, maxReady);
return this.emitFrames(events);
return this.emitFrames(events, isLastFrame);
})
.then(() => --this.lag);
return this.chain;
};
this.emitFrames = events => this.hooks.onFrames(events)
this.emitFrames = (events, last) => this.hooks.onFrames(events)
.then(() => {
if (this.state.ending) {
const lastEvents = this.hooks.bufferEmpty();
if (lastEvents.length) {
return this.emitFrames(lastEvents);
}
if (last) {
this.state.ending = false;
this.state.ended = true;