Add independent stream service

This commit is contained in:
gconsidine 2018-03-08 16:59:30 -05:00 committed by Jake McDermott
parent 7acc99cf15
commit 189963ae83
5 changed files with 230 additions and 99 deletions

View File

@ -1,6 +1,3 @@
const JOB_START = 'playbook_on_start';
const JOB_END = 'playbook_on_stats';
let vm;
let $compile;
let $scope;
@ -8,17 +5,20 @@ let $q;
let page;
let render;
let scroll;
let stream;
let resource;
let $state;
let qs;
let chain;
let chainLength;
function JobsIndexController (
_resource_,
_page_,
_scroll_,
_render_,
_stream_,
_$scope_,
_$compile_,
_$q_,
@ -35,6 +35,7 @@ function JobsIndexController (
page = _page_;
scroll = _scroll_;
render = _render_;
stream = _stream_;
// Development helper(s)
vm.clear = devClear;
@ -53,17 +54,6 @@ function JobsIndexController (
vm.expand = expand;
vm.isExpanded = true;
// Real-time (active between JOB_START and JOB_END events only)
vm.stream = {
active: false,
rendering: false,
paused: false
};
const stream = false; // TODO: Set in route
chain = $q.resolve();
// search
$state = _$state_;
qs = _qs_;
@ -83,8 +73,10 @@ function JobsIndexController (
render.requestAnimationFrame(() => init());
}
function init (stream) {
page.init(resource);
function init (pageMode) {
page.init({
resource
});
render.init({
get: () => resource.model.get(`related.${resource.related}.results`),
@ -97,76 +89,24 @@ function init (stream) {
next
});
if (stream) {
$scope.$on(resource.ws.namespace, process);
} else {
stream.init({
page,
scroll,
resource,
render: events => shift().then(() => append(events, true)),
listen: (namespace, listener) => {
$scope.$on(namespace, (scope, data) => listener(data));
}
});
if (pageMode) {
next();
}
}
function process (scope, data) {
chain = chain.then(() => {
if (data.event === JOB_START) {
vm.stream.active = true;
scroll.lock();
} else if (data.event === JOB_END) {
vm.stream.active = false;
}
const pageAdded = page.addToBuffer(data);
if (pageAdded && !scroll.isLocked()) {
vm.stream.paused = true;
}
if (vm.stream.paused && scroll.isLocked()) {
vm.stream.paused = false;
}
if (vm.stream.rendering || vm.stream.paused) {
return;
}
const events = page.emptyBuffer();
return renderStream(events);
})
}
function renderStream (events) {
vm.stream.rendering = true;
return shift()
.then(() => append(events, true))
.then(() => {
if (scroll.isLocked()) {
scroll.setScrollPosition(scroll.getScrollHeight());
}
if (!vm.stream.active) {
const buffer = page.emptyBuffer();
if (buffer.length) {
return renderStream(buffer);
} else {
vm.stream.rendering = false;
scroll.unlock();
}
} else {
vm.stream.rendering = false;
}
});
}
function devClear () {
init(true);
function devClear (pageMode) {
init(pageMode);
render.clear();
vm.stream = {
active: false,
rendering: false,
paused: false
};
}
function next () {
@ -257,14 +197,12 @@ function scrollHome () {
}
function scrollEnd () {
if (scroll.isLocked()) {
page.setBookmark();
scroll.unlock();
return;
} else if (!scroll.isLocked() && vm.stream.active) {
page.removeBookmark();
scroll.lock();
if (stream.isActive()) {
if (stream.isPaused()) {
stream.resume();
} else {
stream.pause();
}
return;
}
@ -339,6 +277,7 @@ function toggle (uuid, menu) {
lines.removeClass('hidden');
}
}
//
// Search
@ -402,6 +341,7 @@ JobsIndexController.$inject = [
'JobPageService',
'JobScrollService',
'JobRenderService',
'JobStreamService',
'$scope',
'$compile',
'$q',

View File

@ -7,6 +7,7 @@ import PageService from '~features/output/page.service';
import RenderService from '~features/output/render.service';
import ScrollService from '~features/output/scroll.service';
import SearchKeyDirective from '~features/output/search-key.directive';
import StreamService from '~features/output/stream.service';
const Template = require('~features/output/index.view.html');
@ -190,6 +191,7 @@ angular
.service('JobStrings', Strings)
.service('JobPageService', PageService)
.service('JobScrollService', ScrollService)
.service('JobStreamService', StreamService)
.directive('atSearchKey', SearchKeyDirective)
.run(JobsRun);

View File

@ -1,7 +1,8 @@
<div class="container-fluid">
<div class="col-md-4">
<at-panel>
<p><button class="btn" ng-click="vm.clear()">clear</button></p>
<p><button class="btn" ng-click="vm.clear()">Stream Mode</button></p>
<p><button class="btn" ng-click="vm.clear(true)">Page Mode</button></p>
</at-panel>
</div>

View File

@ -1,5 +1,5 @@
function JobPageService ($q) {
this.init = resource => {
this.init = ({ resource }) => {
this.resource = resource;
this.page = {
@ -54,12 +54,9 @@ function JobPageService ($q) {
reference.state.count++;
};
this.addToPageCache = (index, event, reference) => {
reference.cache[index].events.push(event);
};
this.addToBuffer = event => {
const reference = this.getReference();
const index = reference.cache.length - 1;
let pageAdded = false;
if (this.result.count % this.page.size === 0) {
@ -70,9 +67,10 @@ function JobPageService ($q) {
}
this.trimBuffer();
pageAdded = true;
} else {
this.addToPageCache(reference.cache.length - 1, event, reference);
reference.cache[index].events.push(event);
}
this.buffer.count++;
@ -97,6 +95,14 @@ function JobPageService ($q) {
}
};
this.isBufferFull = () => {
if (this.buffer.count === 2) {
return true;
}
return false;
}
this.emptyBuffer = () => {
const reference = this.getReference();
let data = [];
@ -183,9 +189,9 @@ function JobPageService ($q) {
return;
}
this.bookmark.state.first = this.page.state.first;
this.bookmark.state.last = this.page.state.last;
this.bookmark.state.current = this.page.state.current;
this.bookmark.state.first = this.page.state.first - 1;
this.bookmark.state.last = this.page.state.last - 1;
this.bookmark.state.current = this.page.state.current - 1;
this.bookmark.cache = JSON.parse(JSON.stringify(this.page.cache));
this.bookmark.set = true;
this.bookmark.pending = false;

View File

@ -0,0 +1,182 @@
const JOB_START = 'playbook_on_start';
const JOB_END = 'playbook_on_stats';
const MAX_LAG = 120;
function JobStreamService ($q) {
this.init = ({ resource, scroll, page, render, listen }) => {
this.resource = resource;
this.scroll = scroll;
this.page = page;
this.lag = 0;
this.count = 0;
this.pageCount = 0;
this.chain = $q.resolve();
this.factors = this.getBatchFactors(this.resource.page.size);
this.state = {
started: false,
paused: false,
pausing: false,
resuming: false,
ending: false,
ended: false
};
this.hooks = {
render,
listen
};
this.hooks.listen(resource.ws.namespace, this.listen);
};
this.getBatchFactors = size => {
const factors = [1];
for (let i = 2; i <= size / 2; i++) {
if (size % i === 0) {
factors.push(i);
}
}
factors.push(size);
return factors;
};
this.getBatchFactorIndex = () => {
const index = Math.floor((this.lag / MAX_LAG) * this.factors.length);
return index > this.factors.length - 1 ? this.factors.length - 1 : index;
};
this.setBatchFrameCount = () => {
const index = this.getBatchFactorIndex();
this.framesPerRender = this.factors[index];
};
this.buffer = data => {
const pageAdded = this.page.addToBuffer(data);
this.pageCount++;
if (pageAdded) {
this.setBatchFrameCount();
if (this.isPausing()) {
this.pause(true);
} else if (this.isResuming()) {
this.resume(true);
}
}
};
this.listen = data => {
this.lag++;
this.chain = this.chain
.then(() => {
if (data.event === JOB_START) {
this.start();
} else if (data.event === JOB_END) {
if (this.isPaused()) {
this.end(true);
} else {
this.end();
}
}
this.buffer(data);
this.count++;
if (this.isPaused() || !this.isBatchFull()) {
return $q.resolve();
}
const events = this.page.emptyBuffer();
this.count -= events.length;
return this.renderFrame(events);
})
.then(() => --this.lag);
};
this.renderFrame = events => {
return this.hooks.render(events)
.then(() => {
if (this.scroll.isLocked()) {
this.scroll.setScrollPosition(this.scroll.getScrollHeight());
}
if (this.isEnding()) {
const lastEvents = this.page.emptyBuffer();
if (lastEvents.length) {
return this.renderFrame(lastEvents);
}
this.end(true);
}
return $q.resolve();
});
};
this.resume = done => {
if (done) {
this.state.resuming = false;
this.state.paused = false;
return;
}
this.scroll.lock();
this.state.resuming = true;
this.page.removeBookmark();
};
this.pause = done => {
if (done) {
this.state.pausing = false;
this.state.paused = true;
this.scroll.resume();
return;
}
this.scroll.unlock();
this.scroll.pause();
this.state.pausing = true;
this.page.setBookmark();
};
this.start = () => {
this.state.started = true;
this.scroll.lock();
};
this.end = done => {
if (done) {
this.state.ending = false;
this.state.ended = true;
this.scroll.unlock();
return;
}
this.state.ending = true;
};
this.isBatchFull = () => this.count % this.framesPerRender === 0;
this.isPaused = () => this.state.paused;
this.isPausing = () => this.state.pausing;
this.isResuming = () => this.state.resuming;
this.isActive = () => this.state.started && !this.state.ended;
this.isEnding = () => this.state.ending;
this.isDone = () => this.state.ended;
}
JobStreamService.$inject = ['$q'];
export default JobStreamService;