Merge pull request #2117 from jakemcdermott/job-results/window

apply a windowing function to buffered event stream
This commit is contained in:
Jake McDermott 2018-06-11 18:31:19 -04:00 committed by GitHub
commit ce411a21c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 29 deletions

View File

@ -32,13 +32,23 @@ function bufferAdd (event) {
bufferState[0] += 1;
bufferState[1] += 1;
return bufferState;
return bufferState[1];
}
function bufferEmpty () {
bufferState[0] = 0;
function bufferEmpty (min, max) {
let count = 0;
let removed = [];
return rx.splice(0, rx.length);
for (let i = bufferState[0] - 1; i >= 0; i--) {
if (rx[i].counter <= max) {
removed = removed.concat(rx.splice(i, 1));
count++;
}
}
bufferState[0] -= count;
return removed;
}
function onFrames (events) {

View File

@ -309,10 +309,13 @@ function SlidingWindowService ($q) {
return Number.isFinite(head) ? head : 0;
};
this.compareRange = (a, b) => a[0] === b[0] && a[1] === b[1];
this.getRange = () => [this.getHeadCounter(), this.getTailCounter()];
this.getMaxCounter = () => {
const counter = this.api.getMaxCounter();
this.getMaxCounter = () => this.api.getMaxCounter();
return Number.isFinite(counter) ? counter : this.getTailCounter();
};
this.getRange = () => [this.getHeadCounter(), this.getTailCounter()];
this.getRecordCount = () => Object.keys(this.records).length;
this.getCapacity = () => EVENT_LIMIT - this.getRecordCount();
}

View File

@ -16,6 +16,7 @@ function OutputStream ($q) {
used: [],
min: 1,
max: 0,
last: null,
ready: false,
};
@ -66,28 +67,30 @@ function OutputStream ($q) {
}
const missing = [];
const ready = [];
let minReady;
let maxReady;
for (let i = this.counters.min; i < this.counters.max; i++) {
for (let i = this.counters.min; i <= this.counters.max; i++) {
if (this.counters.used.indexOf(i) === -1) {
missing.push(i);
} else if (missing.length === 0) {
ready.push(i);
maxReady = i;
}
}
if (missing.length === 0) {
if (maxReady) {
minReady = this.counters.min;
this.counters.ready = true;
this.counters.min = this.counters.max + 1;
this.counters.used = [];
this.counters.min = maxReady + 1;
this.counters.used = this.counters.used.filter(c => c > maxReady);
} else {
this.counters.ready = false;
}
this.counters.missing = missing;
this.counters.readyLines = ready;
return this.counters.ready;
return [minReady, maxReady];
};
this.pushJobEvent = data => {
@ -97,40 +100,36 @@ function OutputStream ($q) {
.then(() => {
if (data.event === JOB_END) {
this.state.ending = true;
this.counters.last = data.counter;
}
const isMissingCounters = !this.updateCounterState(data);
const [length, count] = this.hooks.bufferAdd(data);
const [minReady, maxReady] = this.updateCounterState(data);
const count = this.hooks.bufferAdd(data);
if (count % PAGE_SIZE === 0) {
this.setFramesPerRender();
}
const isBatchReady = length % this.framesPerRender === 0;
const isReady = this.state.ended || (!isMissingCounters && isBatchReady);
const isReady = maxReady && (this.state.ending ||
(maxReady - minReady) % this.framesPerRender === 0);
if (!isReady) {
return $q.resolve();
}
const events = this.hooks.bufferEmpty();
const isLastFrame = this.state.ending && (maxReady >= this.counters.last);
const events = this.hooks.bufferEmpty(minReady, maxReady);
return this.emitFrames(events);
return this.emitFrames(events, isLastFrame);
})
.then(() => --this.lag);
return this.chain;
};
this.emitFrames = events => this.hooks.onFrames(events)
this.emitFrames = (events, last) => this.hooks.onFrames(events)
.then(() => {
if (this.state.ending) {
const lastEvents = this.hooks.bufferEmpty();
if (lastEvents.length) {
return this.emitFrames(lastEvents);
}
if (last) {
this.state.ending = false;
this.state.ended = true;