# HG changeset patch # User Lucas Thompson # Date 1493220111 -3600 # Node ID b8d1988b774f3b9acb8a85b809e523bf58e3bccf # Parent c03b6a1eb9c921c173a1ac169eaa728ba748cd76# Parent 7106cdd59e62747de1563372fb8b31a917be353b Merge pull request #18 from LucasThompson/fix/slow-streaming Fix/slow streaming diff -r c03b6a1eb9c9 -r b8d1988b774f src/app/services/feature-extraction/FeatureExtractionWorker.ts --- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts Wed Apr 26 09:38:17 2017 +0100 +++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts Wed Apr 26 16:21:51 2017 +0100 @@ -7,7 +7,9 @@ SimpleRequest } from 'piper/HigherLevelUtilities'; import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; -import {AvailableLibraries} from './feature-extraction.service'; +import { + AvailableLibraries +} from './feature-extraction.service'; import { DedicatedWorkerGlobalScope, WebWorkerStreamingServer @@ -19,6 +21,7 @@ } from 'piper/StreamingService'; import {Observable} from 'rxjs/Observable'; import {EmscriptenModule} from 'piper/PiperVampService'; +import {streamingResponseReducer} from './FeatureReducers'; interface MessageEvent { readonly data: any; @@ -67,6 +70,29 @@ } } +class ReducingAggregateService extends AggregateStreamingService { + constructor() { + super(); + } + + collect(request: SimpleRequest): Observable { + let lastPercentagePoint = 0; + return super.collect(request) + .scan(streamingResponseReducer) + .filter(val => { + const percentage = + 100 * (val.processedBlockCount / val.totalBlockCount) | 0; + const pointDifference = (percentage - lastPercentagePoint); + if (pointDifference === 1 || percentage === 100) { + lastPercentagePoint = percentage; + return true; + } else { + return false; + } + }); + } +} + export default class FeatureExtractionWorker { private workerScope: DedicatedWorkerGlobalScope; private remoteLibraries: Map; @@ -77,7 +103,7 @@ private requireJs: RequireJs) { this.workerScope = workerScope; this.remoteLibraries = new Map(); - this.service = new AggregateStreamingService(); + this.service = new ReducingAggregateService(); this.setupImportLibraryListener(); this.server = new WebWorkerStreamingServer( this.workerScope, diff -r c03b6a1eb9c9 -r b8d1988b774f src/app/services/feature-extraction/FeatureReducers.ts --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/app/services/feature-extraction/FeatureReducers.ts Wed Apr 26 16:21:51 2017 +0100 @@ -0,0 +1,63 @@ +/** + * Created by lucast on 26/04/2017. + */ +import {StreamingResponse} from 'piper/StreamingService'; +import {Feature} from 'piper/Feature'; +import {SampleType} from 'piper'; + +export const arrayReducer = (acc: T[], val: T[]): T[] => { + acc.push.apply(acc, val); + return acc; +}; + +export const typedArrayReducer = (acc: Float32Array, + val: Float32Array): Float32Array => { + return Float32Array.of(...acc, ...val); +}; + +const inPlaceTypedArrayReducer = (acc: Float32Array, + val: Float32Array, + i: number): Float32Array => { + acc.set(val, i); + return acc; +}; + +export const streamingResponseReducer = (acc: StreamingResponse, + val: StreamingResponse, + i: number): StreamingResponse => { + acc.processedBlockCount = val.processedBlockCount; + if (acc.features.data instanceof Array && + val.features.data instanceof Array) { + acc.features.data = arrayReducer( + acc.features.data, + val.features.data + ); + } else if (acc.features.data instanceof Float32Array && + val.features.data instanceof Float32Array) { + const isOneSamplePerStep = acc.outputDescriptor.configured.sampleType === + SampleType.OneSamplePerStep; + if (isOneSamplePerStep) { + // for one sample per step vectors we know there will be totalBlockCount + // number of samples - so pre-allocate the Float32Array when we know + // the totalBlockCount (after receiving the first feature) + if ( i === 1 ) { + const newBlock = new Float32Array(acc.totalBlockCount); + newBlock[0] = acc.features.data[0]; + acc.features.data = newBlock; + } + acc.features.data = inPlaceTypedArrayReducer( + acc.features.data, + val.features.data, + i + ); + } else { // if not OneSamplePerStep we have to make a new array each time + acc.features.data = typedArrayReducer( + acc.features.data, + val.features.data + ); + } + } else { + throw new Error('Invalid feature output. Aborting'); + } + return acc; +}; diff -r c03b6a1eb9c9 -r b8d1988b774f src/app/services/feature-extraction/feature-extraction.service.ts --- a/src/app/services/feature-extraction/feature-extraction.service.ts Wed Apr 26 09:38:17 2017 +0100 +++ b/src/app/services/feature-extraction/feature-extraction.service.ts Wed Apr 26 16:21:51 2017 +0100 @@ -66,14 +66,6 @@ } extract(analysisItemId: string, request: SimpleRequest): Promise { - const arrayReducer = (acc, val) => { - acc.push.apply(acc, val); - return acc; - }; - const typedArrayReducer = (acc: Float32Array, - val: Float32Array): Float32Array => { - return Float32Array.of(...acc, ...val); - }; return this.client.collect(request) .do(val => { this.progressUpdated.next({ @@ -81,24 +73,6 @@ value: (val.processedBlockCount / val.totalBlockCount) * 100 }); }) - .reduce((acc, val) => { - if (acc.features.data instanceof Array && - val.features.data instanceof Array) { - acc.features.data = arrayReducer( - acc.features.data, - val.features.data - ); - } else if (acc.features.data instanceof Float32Array && - val.features.data instanceof Float32Array) { - acc.features.data = typedArrayReducer( - acc.features.data, - val.features.data - ); - } else { - throw new Error('Invalid feature output. Aborting'); - } - return acc; - }) .toPromise() .then((response) => { this.featuresExtracted.next(response);