Merge pull request #2131 from jakemcdermott/job-results/eof-counter

replace final_line_count with final_counter for EOF websocket
This commit is contained in:
Jake McDermott
2018-06-12 14:11:33 -04:00
committed by GitHub
5 changed files with 44 additions and 11 deletions

View File

@@ -162,7 +162,7 @@ class CallbackBrokerWorker(ConsumerMixin):
if body.get('event') == 'EOF': if body.get('event') == 'EOF':
try: try:
final_line_count = body.get('final_line_count', 0) final_counter = body.get('final_counter', 0)
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier)) logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
# EOF events are sent when stdout for the running task is # EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we # closed. don't actually persist them to the database; we
@@ -170,7 +170,7 @@ class CallbackBrokerWorker(ConsumerMixin):
# approximation for when a job is "done" # approximation for when a job is "done"
emit_channel_notification( emit_channel_notification(
'jobs-summary', 'jobs-summary',
dict(group_name='jobs', unified_job_id=job_identifier, final_line_count=final_line_count) dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter)
) )
# Additionally, when we've processed all events, we should # Additionally, when we've processed all events, we should
# have all the data we need to send out success/failure # have all the data we need to send out success/failure

View File

@@ -989,7 +989,7 @@ class OutputEventFilter(object):
if value: if value:
self._emit_event(value) self._emit_event(value)
self._buffer = StringIO() self._buffer = StringIO()
self._event_callback(dict(event='EOF', final_line_count=self._start_line)) self._event_callback(dict(event='EOF', final_counter=self._counter - 1))
def _emit_event(self, buffered_stdout, next_event_data=None): def _emit_event(self, buffered_stdout, next_event_data=None):
next_event_data = next_event_data or {} next_event_data = next_event_data or {}

View File

@@ -163,6 +163,11 @@ function startListening () {
listeners.push($scope.$on(resource.ws.events, (scope, data) => handleJobEvent(data))); listeners.push($scope.$on(resource.ws.events, (scope, data) => handleJobEvent(data)));
listeners.push($scope.$on(resource.ws.status, (scope, data) => handleStatusEvent(data))); listeners.push($scope.$on(resource.ws.status, (scope, data) => handleStatusEvent(data)));
if (resource.model.get('type') === 'job') return;
if (resource.model.get('type') === 'project_update') return;
listeners.push($scope.$on(resource.ws.summary, (scope, data) => handleSummaryEvent(data)));
} }
function handleStatusEvent (data) { function handleStatusEvent (data) {
@@ -174,6 +179,13 @@ function handleJobEvent (data) {
status.pushJobEvent(data); status.pushJobEvent(data);
} }
function handleSummaryEvent (data) {
if (resource.model.get('id') !== data.unified_job_id) return;
if (!data.final_counter) return;
stream.setFinalCounter(data.final_counter);
}
function OutputIndexController ( function OutputIndexController (
_$compile_, _$compile_,
_$q_, _$q_,

View File

@@ -106,6 +106,7 @@ function resolveResource (
ws: { ws: {
events: `${WS_PREFIX}-${key}-${id}`, events: `${WS_PREFIX}-${key}-${id}`,
status: `${WS_PREFIX}-${name}`, status: `${WS_PREFIX}-${name}`,
summary: `${WS_PREFIX}-${name}-summary`,
}, },
page: { page: {
cache: PAGE_CACHE, cache: PAGE_CACHE,

View File

@@ -14,10 +14,10 @@ function OutputStream ($q) {
this.counters = { this.counters = {
used: [], used: [],
ready: [],
min: 1, min: 1,
max: 0, max: 0,
last: null, final: null,
ready: false,
}; };
this.state = { this.state = {
@@ -81,16 +81,14 @@ function OutputStream ($q) {
if (maxReady) { if (maxReady) {
minReady = this.counters.min; minReady = this.counters.min;
this.counters.ready = true;
this.counters.min = maxReady + 1; this.counters.min = maxReady + 1;
this.counters.used = this.counters.used.filter(c => c > maxReady); this.counters.used = this.counters.used.filter(c => c > maxReady);
} else {
this.counters.ready = false;
} }
this.counters.missing = missing; this.counters.missing = missing;
this.counters.ready = [minReady, maxReady];
return [minReady, maxReady]; return this.counters.ready;
}; };
this.pushJobEvent = data => { this.pushJobEvent = data => {
@@ -100,7 +98,7 @@ function OutputStream ($q) {
.then(() => { .then(() => {
if (data.event === JOB_END) { if (data.event === JOB_END) {
this.state.ending = true; this.state.ending = true;
this.counters.last = data.counter; this.counters.final = data.counter;
} }
const [minReady, maxReady] = this.updateCounterState(data); const [minReady, maxReady] = this.updateCounterState(data);
@@ -117,7 +115,7 @@ function OutputStream ($q) {
return $q.resolve(); return $q.resolve();
} }
const isLastFrame = this.state.ending && (maxReady >= this.counters.last); const isLastFrame = this.state.ending && (maxReady >= this.counters.final);
const events = this.hooks.bufferEmpty(minReady, maxReady); const events = this.hooks.bufferEmpty(minReady, maxReady);
return this.emitFrames(events, isLastFrame); return this.emitFrames(events, isLastFrame);
@@ -127,6 +125,27 @@ function OutputStream ($q) {
return this.chain; return this.chain;
}; };
this.setFinalCounter = counter => {
this.chain = this.chain
.then(() => {
this.state.ending = true;
this.counters.final = counter;
if (counter >= this.counters.min) {
return $q.resolve();
}
let events = [];
if (this.counters.ready.length > 0) {
events = this.hooks.bufferEmpty(...this.counters.ready);
}
return this.emitFrames(events, true);
});
return this.chain;
};
this.emitFrames = (events, last) => this.hooks.onFrames(events) this.emitFrames = (events, last) => this.hooks.onFrames(events)
.then(() => { .then(() => {
if (last) { if (last) {
@@ -136,6 +155,7 @@ function OutputStream ($q) {
this.hooks.onStop(); this.hooks.onStop();
} }
this.counters.ready.length = 0;
return $q.resolve(); return $q.resolve();
}); });
} }