# HG changeset patch # User Lucas Thompson # Date 1493219808 -3600 # Node ID 7106cdd59e62747de1563372fb8b31a917be353b # Parent ca5fcdfa08515be96795c5a589d4911e9a667be1 Combine the features as they come in. On every percentage point change, emit a StreamingResponse with the collected features so far, enabling progress reporting for the client. diff -r ca5fcdfa0851 -r 7106cdd59e62 src/app/services/feature-extraction/FeatureExtractionWorker.ts --- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts Wed Apr 26 16:14:25 2017 +0100 +++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts Wed Apr 26 16:16:48 2017 +0100 @@ -7,7 +7,9 @@ SimpleRequest } from 'piper/HigherLevelUtilities'; import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; -import {AvailableLibraries} from './feature-extraction.service'; +import { + AvailableLibraries +} from './feature-extraction.service'; import { DedicatedWorkerGlobalScope, WebWorkerStreamingServer @@ -19,6 +21,7 @@ } from 'piper/StreamingService'; import {Observable} from 'rxjs/Observable'; import {EmscriptenModule} from 'piper/PiperVampService'; +import {streamingResponseReducer} from './FeatureReducers'; interface MessageEvent { readonly data: any; @@ -67,6 +70,29 @@ } } +class ReducingAggregateService extends AggregateStreamingService { + constructor() { + super(); + } + + collect(request: SimpleRequest): Observable { + let lastPercentagePoint = 0; + return super.collect(request) + .scan(streamingResponseReducer) + .filter(val => { + const percentage = + 100 * (val.processedBlockCount / val.totalBlockCount) | 0; + const pointDifference = (percentage - lastPercentagePoint); + if (pointDifference === 1 || percentage === 100) { + lastPercentagePoint = percentage; + return true; + } else { + return false; + } + }); + } +} + export default class FeatureExtractionWorker { private workerScope: DedicatedWorkerGlobalScope; private remoteLibraries: Map; @@ -77,7 +103,7 @@ private requireJs: RequireJs) { this.workerScope = workerScope; this.remoteLibraries = new Map(); - this.service = new AggregateStreamingService(); + this.service = new ReducingAggregateService(); this.setupImportLibraryListener(); this.server = new WebWorkerStreamingServer( this.workerScope,