dev@40: /** dev@40: * Created by lucas on 01/12/2016. dev@40: */ dev@40: dev@324: import {PiperVampService, ListRequest, ListResponse} from 'piper'; dev@72: import { dev@226: SimpleRequest dev@72: } from 'piper/HigherLevelUtilities'; dev@44: import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; dev@243: import { dev@243: AvailableLibraries dev@243: } from './feature-extraction.service'; dev@226: import { dev@226: DedicatedWorkerGlobalScope, dev@226: WebWorkerStreamingServer dev@236: } from 'piper/servers/WebWorkerStreamingServer'; dev@226: import { dev@226: PiperStreamingService, dev@226: StreamingResponse, dev@226: StreamingService dev@236: } from 'piper/StreamingService'; dev@236: import {Observable} from 'rxjs/Observable'; dev@236: import {EmscriptenModule} from 'piper/PiperVampService'; dev@243: import {streamingResponseReducer} from './FeatureReducers'; dev@40: dev@40: interface MessageEvent { dev@40: readonly data: any; dev@40: } dev@40: dev@72: type LibraryUri = string; dev@72: type LibraryKey = string; dev@72: dev@72: type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void; dev@322: type Factory = () => T; dev@72: dev@324: function waterfall(tasks: (() => Promise)[]): Promise { dev@324: const reducer = (running: T[], next: Promise): Promise => { dev@324: return next.then(response => { dev@324: running = running.concat(response); dev@324: return running; dev@324: }); dev@324: }; dev@324: dev@324: return tasks.reduce((runningResponses, nextResponse) => { dev@324: return runningResponses.then(response => { dev@324: return reducer(response, nextResponse()); dev@324: }) dev@324: }, Promise.resolve([])); dev@324: } dev@324: dev@226: class AggregateStreamingService implements StreamingService { dev@322: private services: Map>; dev@226: dev@226: constructor() { dev@322: this.services = new Map>(); dev@322: this.services.set( dev@322: 'vamp-example-plugins', dev@322: () => new PiperStreamingService( dev@322: new PiperVampService(VampExamplePlugins()) dev@322: ) dev@322: ); dev@226: } dev@226: dev@322: addService(key: LibraryKey, service: Factory): void { dev@226: this.services.set(key, service); dev@226: } dev@226: dev@324: hasRemoteService(key: LibraryKey): boolean { dev@324: return this.services.has(key); dev@324: } dev@324: dev@226: list(request: ListRequest): Promise { dev@323: const listThunks: (() => Promise)[] = [ dev@323: ...this.services.values() dev@323: ].map(client => () => client().list({})); dev@323: dev@324: return waterfall(listThunks).then(responses => { dev@324: return responses.reduce((allAvailable, res) => { dev@324: allAvailable.available = allAvailable.available.concat(res.available); dev@324: return allAvailable; dev@324: }, {available: []}); dev@324: }) dev@226: } dev@226: dev@226: process(request: SimpleRequest): Observable { dev@248: return this.dispatch('process', request); dev@226: } dev@226: dev@305: protected dispatch(method: 'process', dev@248: request: SimpleRequest): Observable { dev@226: const key = request.key.split(':')[0]; dev@322: return this.services.has(key) ? this.services.get(key)()[method](request) : dev@322: Observable.throw('Invalid key'); dev@226: } dev@226: } dev@226: dev@250: class ThrottledReducingAggregateService extends AggregateStreamingService { dev@243: constructor() { dev@243: super(); dev@243: } dev@243: dev@305: protected dispatch(method: 'process', dev@248: request: SimpleRequest): Observable { dev@243: let lastPercentagePoint = 0; dev@305: let shouldClear = false; dev@248: return super.dispatch(method, request) dev@305: .scan((acc, value) => { dev@305: if (shouldClear) { dev@305: acc.features = []; dev@305: } dev@305: return streamingResponseReducer(acc, value); dev@305: }) dev@243: .filter(val => { dev@305: const progress = val.progress; dev@243: const percentage = dev@305: 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0; dev@243: const pointDifference = (percentage - lastPercentagePoint); dev@249: const shouldEmit = pointDifference === 1 || percentage === 100; dev@249: if (shouldEmit) { dev@243: lastPercentagePoint = percentage; dev@243: } dev@305: shouldClear = shouldEmit; dev@249: return shouldEmit; dev@243: }); dev@243: } dev@243: } dev@243: dev@40: export default class FeatureExtractionWorker { dev@226: private workerScope: DedicatedWorkerGlobalScope; dev@72: private remoteLibraries: Map; dev@226: private server: WebWorkerStreamingServer; dev@226: private service: AggregateStreamingService; dev@40: dev@226: constructor(workerScope: DedicatedWorkerGlobalScope, dev@226: private requireJs: RequireJs) { dev@40: this.workerScope = workerScope; dev@324: this.remoteLibraries = new Map(); dev@250: this.service = new ThrottledReducingAggregateService(); dev@226: this.setupImportLibraryListener(); dev@226: this.server = new WebWorkerStreamingServer( dev@226: this.workerScope, dev@226: this.service dev@72: ); dev@226: } dev@72: dev@226: private setupImportLibraryListener(): void { dev@229: dev@44: this.workerScope.onmessage = (ev: MessageEvent) => { dev@44: switch (ev.data.method) { dev@72: case 'addRemoteLibraries': // TODO rename dev@72: const available: AvailableLibraries = ev.data.params; dev@324: const importThunks = Object.keys(available).map(libraryKey => { dev@324: return () => { dev@324: this.remoteLibraries.set(libraryKey, available[libraryKey]); dev@324: return this.import(libraryKey).then(key => { dev@324: return key; dev@324: }); dev@324: }; dev@72: }); dev@324: waterfall(importThunks).then(() => { dev@324: this.service.list({}).then(response => { dev@324: this.workerScope.postMessage({ dev@324: method: 'import', dev@324: result: response dev@324: }); dev@324: }); dev@324: }) dev@44: } dev@44: }; dev@40: } dev@324: dev@324: private import(key: LibraryKey): Promise { // TODO return type? dev@324: return new Promise((res, rej) => { dev@324: if (this.remoteLibraries.has(key)) { dev@324: // TODO RequireJs can fail... need to reject the promise then dev@324: this.requireJs([this.remoteLibraries.get(key)], (plugin) => { dev@324: dev@324: const service = () => { dev@324: // TODO a factory with more logic probably belongs in piper-js dev@324: const lib: any | EmscriptenModule = plugin.createLibrary(); dev@324: const isEmscriptenModule = typeof lib.cwrap === 'function'; dev@324: return new PiperStreamingService( dev@324: isEmscriptenModule ? new PiperVampService(lib) : lib // TODO dev@324: ); dev@324: }; dev@324: this.service.addService(key, service); dev@324: res(key); dev@324: }); dev@324: } else { dev@324: rej('Invalid remote library key'); dev@324: } dev@324: }); dev@324: } dev@40: }