Merge pull request #1755 from jakemcdermott/1747

always render initial set of job events on initialization for still-running jobs
This commit is contained in:
Jake McDermott
2018-05-14 11:41:38 -04:00
committed by GitHub
7 changed files with 141 additions and 55 deletions

View File

@@ -95,7 +95,7 @@ class ReplayJobEvents():
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job))) raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
sys.exit(1) sys.exit(1)
def run(self, job_id, speed=1.0, verbosity=0): def run(self, job_id, speed=1.0, verbosity=0, skip=0):
stats = { stats = {
'events_ontime': { 'events_ontime': {
'total': 0, 'total': 0,
@@ -126,7 +126,10 @@ class ReplayJobEvents():
sys.exit(1) sys.exit(1)
je_previous = None je_previous = None
for je_current in job_events: for n, je_current in enumerate(job_events):
if n < skip:
continue
if not je_previous: if not je_previous:
stats['recording_start'] = je_current.created stats['recording_start'] = je_current.created
self.start(je_current.created) self.start(je_current.created)
@@ -163,21 +166,25 @@ class ReplayJobEvents():
stats['events_total'] += 1 stats['events_total'] += 1
je_previous = je_current je_previous = je_current
stats['replay_end'] = self.now()
stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds()
stats['replay_start'] = stats['replay_start'].isoformat()
stats['replay_end'] = stats['replay_end'].isoformat()
stats['recording_end'] = je_current.created if stats['events_total'] > 2:
stats['recording_duration'] = (stats['recording_end'] - stats['recording_start']).total_seconds() stats['replay_end'] = self.now()
stats['recording_start'] = stats['recording_start'].isoformat() stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds()
stats['recording_end'] = stats['recording_end'].isoformat() stats['replay_start'] = stats['replay_start'].isoformat()
stats['replay_end'] = stats['replay_end'].isoformat()
stats['recording_end'] = je_current.created
stats['recording_duration'] = (stats['recording_end'] - stats['recording_start']).total_seconds()
stats['recording_start'] = stats['recording_start'].isoformat()
stats['recording_end'] = stats['recording_end'].isoformat()
stats['events_ontime']['percentage'] = (stats['events_ontime']['total'] / float(stats['events_total'])) * 100.00
stats['events_late']['percentage'] = (stats['events_late']['total'] / float(stats['events_total'])) * 100.00
stats['events_distance_average'] = stats['events_distance_total'] / stats['events_total']
stats['events_late']['lateness_average'] = stats['events_late']['lateness_total'] / stats['events_late']['total']
else:
stats = {'events_total': stats['events_total']}
stats['events_ontime']['percentage'] = (stats['events_ontime']['total'] / float(stats['events_total'])) * 100.00
stats['events_late']['percentage'] = (stats['events_late']['total'] / float(stats['events_total'])) * 100.00
stats['events_distance_average'] = stats['events_distance_total'] / stats['events_total']
stats['events_late']['lateness_average'] = stats['events_late']['lateness_total'] / stats['events_late']['total']
if verbosity >= 2: if verbosity >= 2:
print(json.dumps(stats, indent=4, sort_keys=True)) print(json.dumps(stats, indent=4, sort_keys=True))
@@ -191,11 +198,14 @@ class Command(BaseCommand):
help='Id of the job to replay (job or adhoc)') help='Id of the job to replay (job or adhoc)')
parser.add_argument('--speed', dest='speed', type=int, metavar='s', parser.add_argument('--speed', dest='speed', type=int, metavar='s',
help='Speedup factor.') help='Speedup factor.')
parser.add_argument('--skip', dest='skip', type=int, metavar='k',
help='Number of events to skip.')
def handle(self, *args, **options): def handle(self, *args, **options):
job_id = options.get('job_id') job_id = options.get('job_id')
speed = options.get('speed') or 1 speed = options.get('speed') or 1
verbosity = options.get('verbosity') or 0 verbosity = options.get('verbosity') or 0
skip = options.get('skip') or 0
replayer = ReplayJobEvents() replayer = ReplayJobEvents()
replayer.run(job_id, speed, verbosity) replayer.run(job_id, speed, verbosity, skip)

View File

@@ -38,6 +38,12 @@ function JobEventEngine ($q) {
}; };
}; };
this.setMinLine = min => {
if (min > this.lines.min) {
this.lines.min = min;
}
};
this.getBatchFactors = size => { this.getBatchFactors = size => {
const factors = [1]; const factors = [1];
@@ -140,10 +146,6 @@ function JobEventEngine ($q) {
this.renderFrame = events => this.hooks.onEventFrame(events) this.renderFrame = events => this.hooks.onEventFrame(events)
.then(() => { .then(() => {
if (this.scroll.isLocked()) {
this.scroll.scrollToBottom();
}
if (this.isEnding()) { if (this.isEnding()) {
const lastEvents = this.page.emptyBuffer(); const lastEvents = this.page.emptyBuffer();

View File

@@ -7,8 +7,11 @@ let resource;
let scroll; let scroll;
let engine; let engine;
let status; let status;
let $http;
let vm; let vm;
let streaming;
let listeners = [];
function JobsIndexController ( function JobsIndexController (
_resource_, _resource_,
@@ -20,6 +23,7 @@ function JobsIndexController (
_$compile_, _$compile_,
_$q_, _$q_,
_status_, _status_,
_$http_,
) { ) {
vm = this || {}; vm = this || {};
@@ -33,6 +37,7 @@ function JobsIndexController (
render = _render_; render = _render_;
engine = _engine_; engine = _engine_;
status = _status_; status = _status_;
$http = _$http_;
// Development helper(s) // Development helper(s)
vm.clear = devClear; vm.clear = devClear;
@@ -67,7 +72,6 @@ function init () {
}); });
render.init({ render.init({
get: () => resource.model.get(`related.${resource.related}.results`),
compile: html => $compile(html)($scope), compile: html => $compile(html)($scope),
isStreamActive: engine.isActive, isStreamActive: engine.isActive,
}); });
@@ -90,32 +94,72 @@ function init () {
status.setJobStatus('running'); status.setJobStatus('running');
}, },
onStop () { onStop () {
stopListening();
status.updateStats(); status.updateStats();
status.dispatch(); status.dispatch();
} }
}); });
$scope.$on(resource.ws.events, handleJobEvent); streaming = false;
$scope.$on(resource.ws.status, handleStatusEvent); return next().then(() => startListening());
if (!status.state.running) {
next();
}
} }
function handleStatusEvent (scope, data) { function stopListening () {
listeners.forEach(deregister => deregister());
listeners = [];
}
function startListening () {
stopListening();
listeners.push($scope.$on(resource.ws.events, (scope, data) => handleJobEvent(data)));
listeners.push($scope.$on(resource.ws.status, (scope, data) => handleStatusEvent(data)));
}
function handleStatusEvent (data) {
status.pushStatusEvent(data); status.pushStatusEvent(data);
} }
function handleJobEvent (scope, data) { function handleJobEvent (data) {
engine.pushJobEvent(data); streaming = streaming || attachToRunningJob();
streaming.then(() => {
status.pushJobEvent(data); engine.pushJobEvent(data);
status.pushJobEvent(data);
});
} }
function devClear (pageMode) { function attachToRunningJob () {
init(pageMode); const target = `${resource.model.get('url')}${resource.related}/`;
render.clear(); const params = { order_by: '-created', page_size: resource.page.size };
scroll.pause();
return render.clear()
.then(() => $http.get(target, { params }))
.then(res => {
const { results } = res.data;
const minLine = 1 + Math.max(...results.map(event => event.end_line));
const maxCount = Math.max(...results.map(event => event.counter));
const lastPage = resource.model.updateCount(maxCount);
page.emptyCache(lastPage);
page.addPage(lastPage, [], true);
engine.setMinLine(minLine);
if (resource.model.page.current === lastPage) {
return $q.resolve();
}
return append(results);
})
.then(() => {
scroll.setScrollPosition(scroll.getScrollHeight());
scroll.resume();
return $q.resolve();
});
} }
function next () { function next () {
@@ -281,6 +325,10 @@ function toggleExpanded () {
vm.expanded = !vm.expanded; vm.expanded = !vm.expanded;
} }
function devClear () {
render.clear().then(() => init());
}
// function showHostDetails (id) { // function showHostDetails (id) {
// jobEvent.request('get', id) // jobEvent.request('get', id)
// .then(() => { // .then(() => {
@@ -331,6 +379,7 @@ JobsIndexController.$inject = [
'$compile', '$compile',
'$q', '$q',
'JobStatusService', 'JobStatusService',
'$http',
]; ];
module.exports = JobsIndexController; module.exports = JobsIndexController;

View File

@@ -70,13 +70,20 @@ function resolveResource (
return null; return null;
} }
const params = { page_size: PAGE_SIZE, order_by: 'start_line' }; const params = {
const config = { pageCache: PAGE_CACHE, pageLimit: PAGE_LIMIT, params }; page_size: PAGE_SIZE,
order_by: 'start_line',
};
const config = {
params,
pageCache: PAGE_CACHE,
pageLimit: PAGE_LIMIT,
};
if (job_event_search) { // eslint-disable-line camelcase if (job_event_search) { // eslint-disable-line camelcase
const queryParams = qs.encodeQuerysetObject(qs.decodeArr(job_event_search)); const query = qs.encodeQuerysetObject(qs.decodeArr(job_event_search));
Object.assign(config.params, query);
Object.assign(config.params, queryParams);
} }
Wait('start'); Wait('start');
@@ -89,7 +96,6 @@ function resolveResource (
} }
promises.push(model.extend('get', related, config)); promises.push(model.extend('get', related, config));
return Promise.all(promises); return Promise.all(promises);
}) })
.then(([stats, model]) => ({ .then(([stats, model]) => ({
@@ -107,18 +113,19 @@ function resolveResource (
size: PAGE_SIZE, size: PAGE_SIZE,
pageLimit: PAGE_LIMIT pageLimit: PAGE_LIMIT
} }
})) }));
.finally(() => Wait('stop'));
if (!handleErrors) { if (!handleErrors) {
return resourcePromise; return resourcePromise
.finally(() => Wait('stop'));
} }
return resourcePromise return resourcePromise
.catch(({ data, status }) => { .catch(({ data, status }) => {
$state.go($state.current, $state.params, { reload: true });
qs.error(data, status); qs.error(data, status);
}); return $state.go($state.current, $state.params, { reload: true });
})
.finally(() => Wait('stop'));
} }
function resolveWebSocketConnection ($stateParams, SocketService) { function resolveWebSocketConnection ($stateParams, SocketService) {

View File

@@ -30,11 +30,11 @@ const re = new RegExp(pattern);
const hasAnsi = input => re.test(input); const hasAnsi = input => re.test(input);
function JobRenderService ($q, $sce, $window) { function JobRenderService ($q, $sce, $window) {
this.init = ({ compile, apply, isStreamActive }) => { this.init = ({ compile, isStreamActive }) => {
this.parent = null; this.parent = null;
this.record = {}; this.record = {};
this.el = $(ELEMENT_TBODY); this.el = $(ELEMENT_TBODY);
this.hooks = { isStreamActive, compile, apply }; this.hooks = { isStreamActive, compile };
}; };
this.sortByLineNumber = (a, b) => { this.sortByLineNumber = (a, b) => {
@@ -239,8 +239,6 @@ function JobRenderService ($q, $sce, $window) {
return list; return list;
}; };
this.getEvents = () => this.hooks.get();
this.insert = (events, insert) => { this.insert = (events, insert) => {
const result = this.transformEventGroup(events); const result = this.transformEventGroup(events);
const html = this.trustHtml(result.html); const html = this.trustHtml(result.html);

View File

@@ -49,48 +49,60 @@ function JobStatusService (moment, message) {
}; };
this.pushStatusEvent = data => { this.pushStatusEvent = data => {
const isJobEvent = (this.job === data.unified_job_id); const isJobStatusEvent = (this.job === data.unified_job_id);
const isProjectEvent = (this.project && (this.project === data.project_id)); const isProjectStatusEvent = (this.project && (this.project === data.project_id));
if (isJobEvent) { if (isJobStatusEvent) {
this.setJobStatus(data.status); this.setJobStatus(data.status);
} else if (isProjectEvent) { this.dispatch();
} else if (isProjectStatusEvent) {
this.setProjectStatus(data.status); this.setProjectStatus(data.status);
this.setProjectUpdateId(data.unified_job_id); this.setProjectUpdateId(data.unified_job_id);
this.dispatch();
} }
}; };
this.pushJobEvent = data => { this.pushJobEvent = data => {
const isLatest = ((!this.counter) || (data.counter > this.counter)); const isLatest = ((!this.counter) || (data.counter > this.counter));
let changed = false;
if (!this.active && !(data.event === JOB_END)) { if (!this.active && !(data.event === JOB_END)) {
this.active = true; this.active = true;
this.setJobStatus('running'); this.setJobStatus('running');
changed = true;
} }
if (isLatest) { if (isLatest) {
this.counter = data.counter; this.counter = data.counter;
this.latestTime = data.created; this.latestTime = data.created;
this.setElapsed(moment(data.created).diff(this.created, 'seconds')); this.setElapsed(moment(data.created).diff(this.created, 'seconds'));
changed = true;
} }
if (data.event === JOB_START) { if (data.event === JOB_START) {
this.setStarted(this.state.started || data.created); this.setStarted(this.state.started || data.created);
changed = true;
} }
if (data.event === PLAY_START) { if (data.event === PLAY_START) {
this.state.counts.plays++; this.state.counts.plays++;
changed = true;
} }
if (data.event === TASK_START) { if (data.event === TASK_START) {
this.state.counts.tasks++; this.state.counts.tasks++;
changed = true;
} }
if (data.event === JOB_END) { if (data.event === JOB_END) {
this.setStatsEvent(data); this.setStatsEvent(data);
changed = true;
} }
this.dispatch(); if (changed) {
this.dispatch();
}
}; };
this.isExpectingStatsEvent = () => (this.jobType === 'job') || this.isExpectingStatsEvent = () => (this.jobType === 'job') ||

View File

@@ -398,6 +398,13 @@ function extend (method, related, config = {}) {
return Promise.reject(new Error(`No related property, ${related}, exists`)); return Promise.reject(new Error(`No related property, ${related}, exists`));
} }
function updateCount (count) {
this.page.count = count;
this.page.last = Math.ceil(count / this.page.size);
return this.page.last;
}
function goToPage (config) { function goToPage (config) {
const params = config.params || {}; const params = config.params || {};
const { page } = config; const { page } = config;
@@ -693,6 +700,7 @@ function BaseModel (resource, settings) {
this.extend = extend; this.extend = extend;
this.copy = copy; this.copy = copy;
this.getDependentResourceCounts = getDependentResourceCounts; this.getDependentResourceCounts = getDependentResourceCounts;
this.updateCount = updateCount;
this.http = { this.http = {
get: httpGet.bind(this), get: httpGet.bind(this),