dev@40: /** dev@40: * Created by lucas on 01/12/2016. dev@40: */ dev@40: dev@497: import {ListRequest, ListResponse, Service} from 'piper-js/core'; dev@497: import {EmscriptenService as PiperVampService} from 'piper-js/emscripten'; dev@72: import { dev@497: OneShotExtractionRequest as SimpleRequest, dev@497: } from 'piper-js/one-shot'; dev@497: import { VampExamplePlugins } from 'piper-js/ext/VampExamplePluginsModule'; dev@243: import { dev@243: AvailableLibraries dev@243: } from './feature-extraction.service'; dev@226: import { dev@226: DedicatedWorkerGlobalScope, dev@226: WebWorkerStreamingServer dev@497: } from 'piper-js/web-worker'; dev@226: import { dev@226: PiperStreamingService, dev@226: StreamingResponse, dev@226: StreamingService dev@497: } from 'piper-js/streaming'; dev@236: import {Observable} from 'rxjs/Observable'; dev@497: import {EmscriptenModule} from 'piper-js/emscripten'; dev@243: import {streamingResponseReducer} from './FeatureReducers'; dev@40: dev@40: interface MessageEvent { dev@40: readonly data: any; dev@40: } dev@40: dev@72: type LibraryUri = string; dev@72: type LibraryKey = string; dev@72: dev@328: type RequireJs = (libs: string[], dev@328: callback: (...libs: any[]) => void, dev@328: errBack: (...failedLibIds: string[]) => void) => void; dev@322: type Factory = () => T; dev@72: dev@324: function waterfall(tasks: (() => Promise)[]): Promise { dev@324: const reducer = (running: T[], next: Promise): Promise => { dev@324: return next.then(response => { dev@324: running = running.concat(response); dev@324: return running; dev@324: }); dev@324: }; dev@324: dev@324: return tasks.reduce((runningResponses, nextResponse) => { dev@324: return runningResponses.then(response => { dev@449: try { dev@449: return reducer(response, nextResponse()); dev@449: } catch (e) { dev@449: throw new QueuedTaskFailure(runningResponses); dev@449: } dev@328: }); dev@324: }, Promise.resolve([])); dev@324: } dev@324: dev@449: class QueuedTaskFailure extends Error { dev@449: public previousResponses: Promise; dev@449: dev@449: constructor(previousResponses: Promise, message?: string) { dev@449: super(message || 'Queued task failed.'); dev@449: this.previousResponses = previousResponses; dev@449: } dev@449: } dev@449: dev@449: function flattenListResponses(responses: ListResponse[]): ListResponse { dev@449: return { dev@449: available: responses.reduce( dev@449: (flat, res) => flat.concat(res.available), dev@449: [] dev@449: ) dev@449: }; dev@449: } dev@449: dev@226: class AggregateStreamingService implements StreamingService { dev@322: private services: Map>; dev@226: dev@226: constructor() { dev@322: this.services = new Map>(); dev@322: this.services.set( dev@322: 'vamp-example-plugins', dev@322: () => new PiperStreamingService( dev@322: new PiperVampService(VampExamplePlugins()) dev@322: ) dev@322: ); dev@226: } dev@226: dev@322: addService(key: LibraryKey, service: Factory): void { dev@226: this.services.set(key, service); dev@226: } dev@226: dev@226: list(request: ListRequest): Promise { dev@323: const listThunks: (() => Promise)[] = [ dev@323: ...this.services.values() dev@328: ].map(createClient => () => createClient().list({})); dev@449: return waterfall(listThunks) dev@449: .then(flattenListResponses); dev@226: } dev@226: dev@226: process(request: SimpleRequest): Observable { dev@248: return this.dispatch('process', request); dev@226: } dev@226: dev@305: protected dispatch(method: 'process', dev@248: request: SimpleRequest): Observable { dev@226: const key = request.key.split(':')[0]; dev@322: return this.services.has(key) ? this.services.get(key)()[method](request) : dev@322: Observable.throw('Invalid key'); dev@226: } dev@226: } dev@226: dev@250: class ThrottledReducingAggregateService extends AggregateStreamingService { dev@243: constructor() { dev@243: super(); dev@243: } dev@243: dev@305: protected dispatch(method: 'process', dev@248: request: SimpleRequest): Observable { dev@243: let lastPercentagePoint = 0; dev@305: let shouldClear = false; dev@248: return super.dispatch(method, request) dev@305: .scan((acc, value) => { dev@305: if (shouldClear) { dev@305: acc.features = []; dev@305: } dev@305: return streamingResponseReducer(acc, value); dev@305: }) dev@243: .filter(val => { dev@305: const progress = val.progress; dev@243: const percentage = dev@305: 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0; dev@243: const pointDifference = (percentage - lastPercentagePoint); dev@249: const shouldEmit = pointDifference === 1 || percentage === 100; dev@249: if (shouldEmit) { dev@243: lastPercentagePoint = percentage; dev@243: } dev@305: shouldClear = shouldEmit; dev@249: return shouldEmit; dev@243: }); dev@243: } dev@243: } dev@243: dev@40: export default class FeatureExtractionWorker { dev@226: private workerScope: DedicatedWorkerGlobalScope; dev@226: private server: WebWorkerStreamingServer; dev@226: private service: AggregateStreamingService; dev@40: dev@226: constructor(workerScope: DedicatedWorkerGlobalScope, dev@226: private requireJs: RequireJs) { dev@40: this.workerScope = workerScope; dev@250: this.service = new ThrottledReducingAggregateService(); dev@226: this.setupImportLibraryListener(); dev@226: this.server = new WebWorkerStreamingServer( dev@226: this.workerScope, dev@226: this.service dev@72: ); dev@226: } dev@72: dev@226: private setupImportLibraryListener(): void { dev@229: dev@44: this.workerScope.onmessage = (ev: MessageEvent) => { dev@44: switch (ev.data.method) { dev@72: case 'addRemoteLibraries': // TODO rename dev@72: const available: AvailableLibraries = ev.data.params; dev@324: const importThunks = Object.keys(available).map(libraryKey => { dev@324: return () => { dev@328: return this.downloadRemoteLibrary( dev@328: libraryKey, dev@328: available[libraryKey] dev@328: ).then(createService => { dev@328: this.service.addService(libraryKey, dev@328: () => new PiperStreamingService( dev@328: createService() dev@328: )); dev@328: }); dev@324: }; dev@72: }); dev@328: waterfall(importThunks) dev@328: .then(() => this.service.list({})) dev@328: .then(response => { dev@324: this.workerScope.postMessage({ dev@324: method: 'import', dev@324: result: response dev@324: }); dev@449: }) dev@449: .catch((e) => { dev@449: console.warn(`${e.message}. Try using results so far`); dev@449: e.previousResponses.then(responses => { dev@449: this.workerScope.postMessage({ dev@449: method: 'import', dev@449: result: flattenListResponses(responses) dev@449: }); dev@449: }); dev@324: }); dev@44: } dev@44: }; dev@40: } dev@324: dev@328: private downloadRemoteLibrary(key: LibraryKey, dev@328: uri: LibraryUri): Promise> { dev@324: return new Promise((res, rej) => { dev@449: this.requireJs([uri], (createModule) => { dev@328: res(() => { dev@328: // TODO a factory with more logic probably belongs in piper-js dev@449: const lib: any | EmscriptenModule = createModule(); dev@328: const isEmscriptenModule = typeof lib.cwrap === 'function'; dev@328: return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO dev@324: }); dev@328: }, (err) => { dev@328: rej(`Failed to load ${key} remote module.`); dev@328: }); dev@324: }); dev@324: } dev@40: }