dev@40: /** dev@40: * Created by lucas on 01/12/2016. dev@40: */ dev@40: dev@88: import {PiperVampService, ListRequest, ListResponse} from 'piper'; dev@72: import { dev@226: SimpleRequest dev@72: } from 'piper/HigherLevelUtilities'; dev@44: import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; dev@243: import { dev@243: AvailableLibraries dev@243: } from './feature-extraction.service'; dev@226: import { dev@226: DedicatedWorkerGlobalScope, dev@226: WebWorkerStreamingServer dev@236: } from 'piper/servers/WebWorkerStreamingServer'; dev@226: import { dev@226: PiperStreamingService, dev@226: StreamingResponse, dev@226: StreamingService dev@236: } from 'piper/StreamingService'; dev@236: import {Observable} from 'rxjs/Observable'; dev@236: import {EmscriptenModule} from 'piper/PiperVampService'; dev@243: import {streamingResponseReducer} from './FeatureReducers'; dev@40: dev@40: interface MessageEvent { dev@40: readonly data: any; dev@40: } dev@40: dev@72: type LibraryUri = string; dev@72: type LibraryKey = string; dev@72: dev@72: type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void; dev@72: dev@226: class AggregateStreamingService implements StreamingService { dev@226: private services: Map; dev@226: dev@226: constructor() { dev@226: this.services = new Map(); dev@226: this.services.set( dev@226: 'vamp-example-plugins', dev@226: new PiperStreamingService(new PiperVampService(VampExamplePlugins())) dev@226: ); dev@226: } dev@226: dev@226: addService(key: LibraryKey, service: PiperStreamingService): void { dev@226: this.services.set(key, service); dev@226: } dev@226: dev@226: list(request: ListRequest): Promise { dev@226: return Promise.all( dev@226: [...this.services.values()].map(client => client.list({})) dev@226: ).then(allAvailable => ({ dev@226: available: allAvailable.reduce( dev@226: (all, current) => all.concat(current.available), dev@226: [] dev@226: ) dev@226: }) dev@226: ); dev@226: } dev@226: dev@226: process(request: SimpleRequest): Observable { dev@248: return this.dispatch('process', request); dev@226: } dev@226: dev@226: collect(request: SimpleRequest): Observable { dev@248: return this.dispatch('collect', request); dev@248: } dev@248: dev@248: protected dispatch(method: 'process' | 'collect', dev@248: request: SimpleRequest): Observable { dev@226: const key = request.key.split(':')[0]; dev@226: return this.services.has(key) ? dev@248: this.services.get(key)[method](request) : Observable.throw('Invalid key'); dev@226: } dev@226: } dev@226: dev@250: class ThrottledReducingAggregateService extends AggregateStreamingService { dev@243: constructor() { dev@243: super(); dev@243: } dev@243: dev@248: protected dispatch(method: 'process' | 'collect', dev@248: request: SimpleRequest): Observable { dev@243: let lastPercentagePoint = 0; dev@248: return super.dispatch(method, request) dev@243: .scan(streamingResponseReducer) dev@243: .filter(val => { dev@243: const percentage = dev@243: 100 * (val.processedBlockCount / val.totalBlockCount) | 0; dev@243: const pointDifference = (percentage - lastPercentagePoint); dev@249: const shouldEmit = pointDifference === 1 || percentage === 100; dev@249: if (shouldEmit) { dev@243: lastPercentagePoint = percentage; dev@243: } dev@249: return shouldEmit; dev@243: }); dev@243: } dev@243: } dev@243: dev@40: export default class FeatureExtractionWorker { dev@226: private workerScope: DedicatedWorkerGlobalScope; dev@72: private remoteLibraries: Map; dev@226: private server: WebWorkerStreamingServer; dev@226: private service: AggregateStreamingService; dev@40: dev@226: constructor(workerScope: DedicatedWorkerGlobalScope, dev@226: private requireJs: RequireJs) { dev@40: this.workerScope = workerScope; dev@72: this.remoteLibraries = new Map(); dev@250: this.service = new ThrottledReducingAggregateService(); dev@226: this.setupImportLibraryListener(); dev@226: this.server = new WebWorkerStreamingServer( dev@226: this.workerScope, dev@226: this.service dev@72: ); dev@226: } dev@72: dev@226: private setupImportLibraryListener(): void { dev@229: dev@44: this.workerScope.onmessage = (ev: MessageEvent) => { dev@64: const sendResponse = (result) => { dev@64: this.workerScope.postMessage({ dev@64: method: ev.data.method, dev@64: result: result dev@64: }); dev@64: }; dev@44: switch (ev.data.method) { dev@72: case 'import': dev@72: const key: LibraryKey = ev.data.params; dev@72: if (this.remoteLibraries.has(key)) { dev@72: this.requireJs([this.remoteLibraries.get(key)], (plugin) => { dev@229: // TODO a factory with more logic probably belongs in piper-js dev@229: const lib: any | EmscriptenModule = plugin.createLibrary(); dev@229: const isEmscriptenModule = typeof lib.cwrap === 'function'; dev@229: const service = new PiperStreamingService( dev@229: isEmscriptenModule ? new PiperVampService(lib) : lib // TODO dev@229: ); dev@229: this.service.addService(key, service); dev@226: this.service.list({}).then(sendResponse); dev@72: }); dev@72: } else { dev@72: console.error('Non registered library key.'); // TODO handle error dev@72: } dev@72: break; dev@72: case 'addRemoteLibraries': // TODO rename dev@72: const available: AvailableLibraries = ev.data.params; dev@236: Object.keys(available).forEach(libraryKey => { dev@236: this.remoteLibraries.set(libraryKey, available[libraryKey]); dev@72: }); dev@44: } dev@44: }; dev@40: } dev@40: }