# HG changeset patch # User Lucas Thompson # Date 1492775981 -3600 # Node ID 4865567d9e432027bbbf9d97074589cf8a195f13 # Parent 16d19c12e42fc22469938c1f10936a8c27d45826 Refactor feature extraction service to use piper streaming client/server. Change FeatureExtractionWorker accordingly and calling code. diff -r 16d19c12e42f -r 4865567d9e43 src/app/app.component.ts --- a/src/app/app.component.ts Fri Apr 21 12:58:55 2017 +0100 +++ b/src/app/app.component.ts Fri Apr 21 12:59:41 2017 +0100 @@ -19,6 +19,7 @@ audioBuffer: AudioBuffer; // TODO consider revising canExtract: boolean; private onAudioDataSubscription: Subscription; + private onProgressUpdated: Subscription; private analyses: AnalysisItem[]; // TODO some immutable state container describing entire session private nRecordings: number; // TODO user control for naming a recording private countingId: number; // TODO improve uniquely identifying items @@ -31,7 +32,7 @@ this.analyses = []; this.canExtract = false; this.nRecordings = 0; - this.countingId = 1; + this.countingId = 0; iconRegistry.addSvgIcon( 'duck', @@ -52,6 +53,13 @@ } } ); + this.onProgressUpdated = this.piperService.progressUpdated$.subscribe( + progress => { + const index = this.analyses.findIndex(val => val.id === progress.id); + if (index === -1) return; + this.analyses[index].progress = progress.value; + } + ); } onFileOpened(file: File | Blob) { @@ -80,7 +88,7 @@ isRoot: true, title: title, description: new Date().toLocaleString(), - id: `${this.countingId++}` + id: `${++this.countingId}` }); } @@ -95,15 +103,16 @@ isRoot: false, title: outputInfo.name, description: outputInfo.outputId, - id: `${this.countingId++}` + id: `${++this.countingId}` }); - this.piperService.collect({ + this.piperService.extract(`${this.countingId}`, { audioData: [...Array(this.audioBuffer.numberOfChannels).keys()] .map(i => this.audioBuffer.getChannelData(i)), audioFormat: { sampleRate: this.audioBuffer.sampleRate, - channelCount: this.audioBuffer.numberOfChannels + channelCount: this.audioBuffer.numberOfChannels, + length: this.audioBuffer.length }, key: outputInfo.extractorKey, outputId: outputInfo.outputId @@ -111,11 +120,13 @@ this.canExtract = true; }).catch(err => { this.canExtract = true; - console.error(err) + this.analyses.shift(); + console.error(`Error whilst extracting: ${err}`); }); } ngOnDestroy(): void { this.onAudioDataSubscription.unsubscribe(); + this.onProgressUpdated.unsubscribe(); } } diff -r 16d19c12e42f -r 4865567d9e43 src/app/services/feature-extraction/FeatureExtractionWorker.ts --- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts Fri Apr 21 12:58:55 2017 +0100 +++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts Fri Apr 21 12:59:41 2017 +0100 @@ -4,18 +4,21 @@ import {PiperVampService, ListRequest, ListResponse} from 'piper'; import { - PiperSimpleClient, SimpleRequest, - SimpleResponse + SimpleRequest } from 'piper/HigherLevelUtilities'; import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; import {AvailableLibraries} from "./feature-extraction.service"; +import { + DedicatedWorkerGlobalScope, + WebWorkerStreamingServer +} from "piper/servers/WebWorkerStreamingServer"; +import { + PiperStreamingService, + StreamingResponse, + StreamingService +} from "piper/StreamingService"; +import {Observable} from "rxjs/Observable"; -// TODO TypeScript has a .d.ts file for webworkers, but for some reason it clashes with the typings for dom and causes compiler errors -interface WorkerGlobalScope { - onmessage: (this: this, ev: MessageEvent) => any; - postMessage(data: any): void; - importScripts(uri: string): void; -} interface MessageEvent { readonly data: any; @@ -26,20 +29,65 @@ type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void; +class AggregateStreamingService implements StreamingService { + private services: Map; + + constructor() { + this.services = new Map(); + this.services.set( + 'vamp-example-plugins', + new PiperStreamingService(new PiperVampService(VampExamplePlugins())) + ); + } + + addService(key: LibraryKey, service: PiperStreamingService): void { + this.services.set(key, service); + } + + list(request: ListRequest): Promise { + return Promise.all( + [...this.services.values()].map(client => client.list({})) + ).then(allAvailable => ({ + available: allAvailable.reduce( + (all, current) => all.concat(current.available), + [] + ) + }) + ); + } + + process(request: SimpleRequest): Observable { + return undefined; + } + + collect(request: SimpleRequest): Observable { + const key = request.key.split(':')[0]; + return this.services.has(key) ? + this.services.get(key).collect(request) : Observable.throw("Invalid key"); + } +} + export default class FeatureExtractionWorker { - private workerScope: WorkerGlobalScope; - private clients: Map; + private workerScope: DedicatedWorkerGlobalScope; + private services: Map; private remoteLibraries: Map; + private server: WebWorkerStreamingServer; + private service: AggregateStreamingService; - constructor(workerScope: WorkerGlobalScope, private requireJs: RequireJs) { + constructor(workerScope: DedicatedWorkerGlobalScope, + private requireJs: RequireJs) { this.workerScope = workerScope; - this.clients = new Map(); + this.services = new Map(); this.remoteLibraries = new Map(); - this.clients.set( - 'vamp-example-plugins', - new PiperSimpleClient(new PiperVampService(VampExamplePlugins())) + this.service = new AggregateStreamingService(); + this.setupImportLibraryListener(); + this.server = new WebWorkerStreamingServer( + this.workerScope, + this.service ); + } + private setupImportLibraryListener(): void { this.workerScope.onmessage = (ev: MessageEvent) => { const sendResponse = (result) => { this.workerScope.postMessage({ @@ -48,31 +96,18 @@ }); }; switch (ev.data.method) { - case 'list': - this.list(ev.data.params) - .then(sendResponse) - .catch(err => console.error(err)); // TODO handle error - break; - case 'process': - this.process(ev.data.params) - .then(sendResponse) - .catch(err => console.error(err)); // TODO handle error - break; - case 'collect': - this.collect(ev.data.params) - .then(sendResponse) - .catch(err => console.error(err)); // TODO handle error - break; case 'import': - // this.workerScope.importScripts(ev.data.params); const key: LibraryKey = ev.data.params; if (this.remoteLibraries.has(key)) { this.requireJs([this.remoteLibraries.get(key)], (plugin) => { - this.clients.set( + this.services.set( key, - new PiperSimpleClient(new PiperVampService(plugin.createLibrary())) + new PiperStreamingService( + new PiperVampService(plugin.createLibrary()) + ) ); // TODO won't always be an emscripten module - this.list({}).then(sendResponse); + this.service.addService(key, this.services.get(key)); + this.service.list({}).then(sendResponse); }); } else { console.error('Non registered library key.'); // TODO handle error @@ -86,30 +121,4 @@ } }; } - - private list(request: ListRequest): Promise { - // TODO actually pay attention to ListRequest - return Promise.all([...this.clients.values()].map(client => client.list({}))) - .then(allAvailable => { - return { - available: allAvailable.reduce( - (all, current) => all.concat(current.available), - [] - ) - }; - }); - } - - // TODO reduce dupe - private process(request: SimpleRequest): Promise { - const key: LibraryKey = request.key.split(':')[0]; - const client: PiperSimpleClient = this.clients.get(key); - return client ? client.process(request) : Promise.reject("Invalid plugin library key."); - } - - private collect(request: SimpleRequest): Promise { - const key: LibraryKey = request.key.split(':')[0]; - const client: PiperSimpleClient = this.clients.get(key); - return client ? client.collect(request) : Promise.reject("Invalid plugin library key."); - } } diff -r 16d19c12e42f -r 4865567d9e43 src/app/services/feature-extraction/feature-extraction.service.ts --- a/src/app/services/feature-extraction/feature-extraction.service.ts Fri Apr 21 12:58:55 2017 +0100 +++ b/src/app/services/feature-extraction/feature-extraction.service.ts Fri Apr 21 12:59:41 2017 +0100 @@ -1,13 +1,19 @@ import {Injectable, Inject} from '@angular/core'; import { - ListResponse, ListRequest + ListResponse } from "piper"; import { - SimpleRequest, SimpleResponse + SimpleRequest, + SimpleResponse } from "piper/HigherLevelUtilities"; import {Subject} from "rxjs/Subject"; import {Observable} from "rxjs"; import {Http, Response} from "@angular/http"; +import { + countingIdProvider, + WebWorkerStreamingClient +} from "piper/client-stubs/WebWorkerStreamingClient"; +import {RequestId} from "piper/protocols/WebWorkerProtocol"; interface RequestMessage { method: string; @@ -24,6 +30,11 @@ [libraryKey: string]: RepoUri; } +export interface Progress { + id: RequestId; + value: number; // between 0 and 100, for material-ui +} + @Injectable() export class FeatureExtractionService { @@ -32,6 +43,9 @@ featuresExtracted$: Observable; private librariesUpdated: Subject; librariesUpdated$: Observable; + private progressUpdated: Subject; + progressUpdated$: Observable; + private client: WebWorkerStreamingClient; constructor(private http: Http, @Inject('PiperRepoUri') private repositoryUri: RepoUri) { this.worker = new Worker('bootstrap-feature-extraction-worker.js'); @@ -39,40 +53,65 @@ this.featuresExtracted$ = this.featuresExtracted.asObservable(); this.librariesUpdated = new Subject(); this.librariesUpdated$ = this.librariesUpdated.asObservable(); + this.progressUpdated = new Subject(); + this.progressUpdated$ = this.progressUpdated.asObservable(); this.worker.addEventListener('message', (ev: MessageEvent) => { const isValidResponse = ev.data.method === 'import' - && ev.data.result.available !== undefined; + && ev.data.result && ev.data.result.available ; if (isValidResponse) { + (ev as Event).stopImmediatePropagation(); this.librariesUpdated.next(ev.data.result); } - }); + }, true); + + this.client = new WebWorkerStreamingClient( + this.worker, + countingIdProvider(0) + ) } list(): Promise { - return this.request( - {method: 'list', params: {}}, - (ev: MessageEvent) => ev.data.result.available !== undefined - ).then(msg => msg.result); + return this.client.list({}); } - process(request: SimpleRequest): Promise { - return this.request( - {method: 'process', params: request}, - (ev: MessageEvent) => ev.data.method === 'process' - ).then(msg => { - this.featuresExtracted.next(msg.result); - return msg.result; - }); - } - - collect(request: SimpleRequest): Promise { - return this.request( - {method: 'collect', params: request}, - (ev: MessageEvent) => ev.data.method === 'collect' - ).then(msg => { - this.featuresExtracted.next(msg.result); - return msg.result; - }); + extract(analysisItemId: string, request: SimpleRequest): Promise { + const arrayReducer = (acc, val) => { + acc.push.apply(acc, val); + return acc; + }; + const typedArrayReducer = (acc: Float32Array, + val: Float32Array): Float32Array => { + return Float32Array.of(...acc, ...val); + }; + return this.client.collect(request) + .do(val => { + this.progressUpdated.next({ + id: analysisItemId, + value: (val.processedBlockCount / val.totalBlockCount) * 100 + }); + }) + .reduce((acc, val) => { + if (acc.features.data instanceof Array && + val.features.data instanceof Array) { + acc.features.data = arrayReducer( + acc.features.data, + val.features.data + ); + } else if (acc.features.data instanceof Float32Array && + val.features.data instanceof Float32Array) { + acc.features.data = typedArrayReducer( + acc.features.data, + val.features.data + ); + } else { + throw "Invalid feature output. Aborting"; + } + return acc; + }) + .toPromise() + .then((response) => { + this.featuresExtracted.next(response); + }); } updateAvailableLibraries(): Observable { @@ -94,18 +133,4 @@ load(libraryKey: string): void { this.worker.postMessage({method: 'import', params: libraryKey}); } - - private request(request: RequestMessage, - predicate: (ev: MessageEvent) => boolean) - : Promise> { - return new Promise(res => { - const listener = (ev: MessageEvent) => { - this.worker.removeEventListener('message', listener); - if (predicate(ev)) - res(ev.data); - }; - this.worker.addEventListener('message', listener); - this.worker.postMessage(request); - }).catch(err => console.error(err)); - } }