mirror of
https://github.com/ansible/awx.git
synced 2026-03-10 14:09:28 -02:30
move buffer mgmt to stream service
This commit is contained in:
@@ -20,50 +20,16 @@ let stream;
|
|||||||
|
|
||||||
let vm;
|
let vm;
|
||||||
|
|
||||||
const bufferState = [0, 0]; // [length, count]
|
|
||||||
const listeners = [];
|
const listeners = [];
|
||||||
const rx = [];
|
|
||||||
|
|
||||||
function bufferInit () {
|
|
||||||
rx.length = 0;
|
|
||||||
|
|
||||||
bufferState[0] = 0;
|
|
||||||
bufferState[1] = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
function bufferAdd (event) {
|
|
||||||
rx.push(event);
|
|
||||||
|
|
||||||
bufferState[0] += 1;
|
|
||||||
bufferState[1] += 1;
|
|
||||||
|
|
||||||
return bufferState[1];
|
|
||||||
}
|
|
||||||
|
|
||||||
function bufferEmpty (min, max) {
|
|
||||||
let count = 0;
|
|
||||||
let removed = [];
|
|
||||||
|
|
||||||
for (let i = bufferState[0] - 1; i >= 0; i--) {
|
|
||||||
if (rx[i].counter <= max) {
|
|
||||||
removed = removed.concat(rx.splice(i, 1));
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bufferState[0] -= count;
|
|
||||||
|
|
||||||
return removed;
|
|
||||||
}
|
|
||||||
|
|
||||||
let lockFrames;
|
let lockFrames;
|
||||||
function onFrames (events) {
|
function onFrames (events) {
|
||||||
|
events = slide.pushFrames(events);
|
||||||
|
|
||||||
if (lockFrames) {
|
if (lockFrames) {
|
||||||
events.forEach(bufferAdd);
|
|
||||||
return $q.resolve();
|
return $q.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
events = slide.pushFrames(events);
|
|
||||||
const popCount = events.length - slide.getCapacity();
|
const popCount = events.length - slide.getCapacity();
|
||||||
const isAttached = events.length > 0;
|
const isAttached = events.length > 0;
|
||||||
|
|
||||||
@@ -481,7 +447,7 @@ function clear () {
|
|||||||
lockFollow = false;
|
lockFollow = false;
|
||||||
lockFrames = false;
|
lockFrames = false;
|
||||||
|
|
||||||
bufferInit();
|
stream.bufferInit();
|
||||||
status.init(resource);
|
status.init(resource);
|
||||||
slide.init(render, resource.events, scroll);
|
slide.init(render, resource.events, scroll);
|
||||||
status.subscribe(data => { vm.status = data.status; });
|
status.subscribe(data => { vm.status = data.status; });
|
||||||
@@ -543,8 +509,6 @@ function OutputIndexController (
|
|||||||
vm.debug = _debug;
|
vm.debug = _debug;
|
||||||
|
|
||||||
render.requestAnimationFrame(() => {
|
render.requestAnimationFrame(() => {
|
||||||
bufferInit();
|
|
||||||
|
|
||||||
status.init(resource);
|
status.init(resource);
|
||||||
slide.init(render, resource.events, scroll);
|
slide.init(render, resource.events, scroll);
|
||||||
render.init({ compile, toggles: vm.toggleLineEnabled });
|
render.init({ compile, toggles: vm.toggleLineEnabled });
|
||||||
@@ -564,8 +528,6 @@ function OutputIndexController (
|
|||||||
let showFollowTip = true;
|
let showFollowTip = true;
|
||||||
const rates = [];
|
const rates = [];
|
||||||
stream.init({
|
stream.init({
|
||||||
bufferAdd,
|
|
||||||
bufferEmpty,
|
|
||||||
onFrames,
|
onFrames,
|
||||||
onFrameRate (rate) {
|
onFrameRate (rate) {
|
||||||
rates.push(rate);
|
rates.push(rate);
|
||||||
|
|||||||
@@ -5,24 +5,16 @@ import {
|
|||||||
OUTPUT_PAGE_SIZE,
|
OUTPUT_PAGE_SIZE,
|
||||||
} from './constants';
|
} from './constants';
|
||||||
|
|
||||||
|
const rx = [];
|
||||||
|
|
||||||
function OutputStream ($q) {
|
function OutputStream ($q) {
|
||||||
this.init = ({ bufferAdd, bufferEmpty, onFrames, onFrameRate, onStop }) => {
|
this.init = ({ onFrames, onFrameRate, onStop }) => {
|
||||||
this.hooks = {
|
this.hooks = {
|
||||||
bufferAdd,
|
|
||||||
bufferEmpty,
|
|
||||||
onFrames,
|
onFrames,
|
||||||
onFrameRate,
|
onFrameRate,
|
||||||
onStop,
|
onStop,
|
||||||
};
|
};
|
||||||
|
|
||||||
this.counters = {
|
|
||||||
used: [],
|
|
||||||
ready: [],
|
|
||||||
min: 1,
|
|
||||||
max: 0,
|
|
||||||
final: null,
|
|
||||||
};
|
|
||||||
|
|
||||||
this.state = {
|
this.state = {
|
||||||
ending: false,
|
ending: false,
|
||||||
ended: false,
|
ended: false,
|
||||||
@@ -30,9 +22,44 @@ function OutputStream ($q) {
|
|||||||
|
|
||||||
this.lag = 0;
|
this.lag = 0;
|
||||||
this.chain = $q.resolve();
|
this.chain = $q.resolve();
|
||||||
|
|
||||||
this.factors = this.calcFactors(OUTPUT_PAGE_SIZE);
|
this.factors = this.calcFactors(OUTPUT_PAGE_SIZE);
|
||||||
|
|
||||||
this.setFramesPerRender();
|
this.setFramesPerRender();
|
||||||
|
this.bufferInit();
|
||||||
|
};
|
||||||
|
|
||||||
|
this.bufferInit = () => {
|
||||||
|
rx.length = 0;
|
||||||
|
|
||||||
|
this.counters = {
|
||||||
|
total: 0,
|
||||||
|
min: 0,
|
||||||
|
max: null,
|
||||||
|
final: null,
|
||||||
|
ready: [],
|
||||||
|
used: [],
|
||||||
|
missing: [],
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
this.bufferEmpty = (minReady, maxReady) => {
|
||||||
|
let removed = [];
|
||||||
|
|
||||||
|
for (let i = rx.length - 1; i >= 0; i--) {
|
||||||
|
if (rx[i].counter <= maxReady) {
|
||||||
|
removed = removed.concat(rx.splice(i, 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return removed;
|
||||||
|
};
|
||||||
|
|
||||||
|
this.bufferAdd = event => {
|
||||||
|
rx.push(event);
|
||||||
|
|
||||||
|
this.counters.total += 1;
|
||||||
|
|
||||||
|
return this.counters.total;
|
||||||
};
|
};
|
||||||
|
|
||||||
this.calcFactors = size => {
|
this.calcFactors = size => {
|
||||||
@@ -63,34 +90,32 @@ function OutputStream ($q) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
this.updateCounterState = ({ counter }) => {
|
this.checkCounter = ({ counter }) => {
|
||||||
this.counters.used.push(counter);
|
this.counters.used.push(counter);
|
||||||
|
|
||||||
if (counter > this.counters.max) {
|
if (!this.counters.max || this.counters.max < counter) {
|
||||||
this.counters.max = counter;
|
this.counters.max = counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let ready;
|
||||||
const missing = [];
|
const missing = [];
|
||||||
let minReady;
|
|
||||||
let maxReady;
|
|
||||||
|
|
||||||
for (let i = this.counters.min; i <= this.counters.max; i++) {
|
for (let i = this.counters.min; i <= this.counters.max; i++) {
|
||||||
if (this.counters.used.indexOf(i) === -1) {
|
if (this.counters.used.indexOf(i) === -1) {
|
||||||
missing.push(i);
|
missing.push(i);
|
||||||
} else if (missing.length === 0) {
|
|
||||||
maxReady = i;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (maxReady) {
|
if (missing.length === 0) {
|
||||||
minReady = this.counters.min;
|
ready = this.counters.max;
|
||||||
|
} else {
|
||||||
this.counters.min = maxReady + 1;
|
ready = missing[0] - 1;
|
||||||
this.counters.used = this.counters.used.filter(c => c > maxReady);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.counters.ready = [this.counters.min, ready];
|
||||||
|
this.counters.min = ready + 1;
|
||||||
|
this.counters.used = this.counters.used.filter(c => c > ready);
|
||||||
this.counters.missing = missing;
|
this.counters.missing = missing;
|
||||||
this.counters.ready = [minReady, maxReady];
|
|
||||||
|
|
||||||
return this.counters.ready;
|
return this.counters.ready;
|
||||||
};
|
};
|
||||||
@@ -105,8 +130,8 @@ function OutputStream ($q) {
|
|||||||
this.counters.final = data.counter;
|
this.counters.final = data.counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
const [minReady, maxReady] = this.updateCounterState(data);
|
const [minReady, maxReady] = this.checkCounter(data);
|
||||||
const count = this.hooks.bufferAdd(data);
|
const count = this.bufferAdd(data);
|
||||||
|
|
||||||
if (count % OUTPUT_PAGE_SIZE === 0) {
|
if (count % OUTPUT_PAGE_SIZE === 0) {
|
||||||
this.setFramesPerRender();
|
this.setFramesPerRender();
|
||||||
@@ -121,7 +146,7 @@ function OutputStream ($q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const isLastFrame = this.state.ending && (maxReady >= this.counters.final);
|
const isLastFrame = this.state.ending && (maxReady >= this.counters.final);
|
||||||
const events = this.hooks.bufferEmpty(minReady, maxReady);
|
const events = this.bufferEmpty(minReady, maxReady);
|
||||||
|
|
||||||
return this.emitFrames(events, isLastFrame);
|
return this.emitFrames(events, isLastFrame);
|
||||||
})
|
})
|
||||||
@@ -142,7 +167,7 @@ function OutputStream ($q) {
|
|||||||
|
|
||||||
let events = [];
|
let events = [];
|
||||||
if (this.counters.ready.length > 0) {
|
if (this.counters.ready.length > 0) {
|
||||||
events = this.hooks.bufferEmpty(...this.counters.ready);
|
events = this.bufferEmpty(...this.counters.ready);
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.emitFrames(events, true);
|
return this.emitFrames(events, true);
|
||||||
|
|||||||
Reference in New Issue
Block a user