Mercurial > hg > ugly-duckling
changeset 243:7106cdd59e62
Combine the features as they come in. On every percentage point change, emit a StreamingResponse with the collected features so far, enabling progress reporting for the client.
author | Lucas Thompson <dev@lucas.im> |
---|---|
date | Wed, 26 Apr 2017 16:16:48 +0100 |
parents | ca5fcdfa0851 |
children | b8d1988b774f |
files | src/app/services/feature-extraction/FeatureExtractionWorker.ts |
diffstat | 1 files changed, 28 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts Wed Apr 26 16:14:25 2017 +0100 +++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts Wed Apr 26 16:16:48 2017 +0100 @@ -7,7 +7,9 @@ SimpleRequest } from 'piper/HigherLevelUtilities'; import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; -import {AvailableLibraries} from './feature-extraction.service'; +import { + AvailableLibraries +} from './feature-extraction.service'; import { DedicatedWorkerGlobalScope, WebWorkerStreamingServer @@ -19,6 +21,7 @@ } from 'piper/StreamingService'; import {Observable} from 'rxjs/Observable'; import {EmscriptenModule} from 'piper/PiperVampService'; +import {streamingResponseReducer} from './FeatureReducers'; interface MessageEvent { readonly data: any; @@ -67,6 +70,29 @@ } } +class ReducingAggregateService extends AggregateStreamingService { + constructor() { + super(); + } + + collect(request: SimpleRequest): Observable<StreamingResponse> { + let lastPercentagePoint = 0; + return super.collect(request) + .scan(streamingResponseReducer) + .filter(val => { + const percentage = + 100 * (val.processedBlockCount / val.totalBlockCount) | 0; + const pointDifference = (percentage - lastPercentagePoint); + if (pointDifference === 1 || percentage === 100) { + lastPercentagePoint = percentage; + return true; + } else { + return false; + } + }); + } +} + export default class FeatureExtractionWorker { private workerScope: DedicatedWorkerGlobalScope; private remoteLibraries: Map<LibraryKey, LibraryUri>; @@ -77,7 +103,7 @@ private requireJs: RequireJs) { this.workerScope = workerScope; this.remoteLibraries = new Map<LibraryKey, LibraryUri>(); - this.service = new AggregateStreamingService(); + this.service = new ReducingAggregateService(); this.setupImportLibraryListener(); this.server = new WebWorkerStreamingServer( this.workerScope,