annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 322:38886ce7e2e5

Instantiate services on list and process, trying to ensure only one emscripten module instantiated at a time (hopefully allowing for garbage collection).
author Lucas Thompson <dev@lucas.im>
date Tue, 16 May 2017 10:44:55 +0100
parents 89208e8af8cc
children 72673c954216
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@322 5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@72 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
dev@322 34 type Factory<T> = () => T;
dev@72 35
dev@226 36 class AggregateStreamingService implements StreamingService {
dev@322 37 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
dev@226 38
dev@226 39 constructor() {
dev@322 40 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
dev@322 41 this.services.set(
dev@322 42 'vamp-example-plugins',
dev@322 43 () => new PiperStreamingService(
dev@322 44 new PiperVampService(VampExamplePlugins())
dev@322 45 )
dev@322 46 );
dev@226 47 }
dev@226 48
dev@322 49 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
dev@226 50 this.services.set(key, service);
dev@226 51 }
dev@226 52
dev@226 53 list(request: ListRequest): Promise<ListResponse> {
dev@226 54 return Promise.all(
dev@322 55 [...this.services.values()].map(client => client().list({}))
dev@226 56 ).then(allAvailable => ({
dev@226 57 available: allAvailable.reduce(
dev@226 58 (all, current) => all.concat(current.available),
dev@226 59 []
dev@226 60 )
dev@226 61 })
dev@226 62 );
dev@226 63 }
dev@226 64
dev@226 65 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 66 return this.dispatch('process', request);
dev@226 67 }
dev@226 68
dev@305 69 protected dispatch(method: 'process',
dev@248 70 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 71 const key = request.key.split(':')[0];
dev@322 72 return this.services.has(key) ? this.services.get(key)()[method](request) :
dev@322 73 Observable.throw('Invalid key');
dev@226 74 }
dev@226 75 }
dev@226 76
dev@250 77 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 78 constructor() {
dev@243 79 super();
dev@243 80 }
dev@243 81
dev@305 82 protected dispatch(method: 'process',
dev@248 83 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 84 let lastPercentagePoint = 0;
dev@305 85 let shouldClear = false;
dev@248 86 return super.dispatch(method, request)
dev@305 87 .scan((acc, value) => {
dev@305 88 if (shouldClear) {
dev@305 89 acc.features = [];
dev@305 90 }
dev@305 91 return streamingResponseReducer(acc, value);
dev@305 92 })
dev@243 93 .filter(val => {
dev@305 94 const progress = val.progress;
dev@243 95 const percentage =
dev@305 96 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 97 const pointDifference = (percentage - lastPercentagePoint);
dev@249 98 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 99 if (shouldEmit) {
dev@243 100 lastPercentagePoint = percentage;
dev@243 101 }
dev@305 102 shouldClear = shouldEmit;
dev@249 103 return shouldEmit;
dev@243 104 });
dev@243 105 }
dev@243 106 }
dev@243 107
dev@40 108 export default class FeatureExtractionWorker {
dev@226 109 private workerScope: DedicatedWorkerGlobalScope;
dev@72 110 private remoteLibraries: Map<LibraryKey, LibraryUri>;
dev@226 111 private server: WebWorkerStreamingServer;
dev@226 112 private service: AggregateStreamingService;
dev@40 113
dev@226 114 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 115 private requireJs: RequireJs) {
dev@40 116 this.workerScope = workerScope;
dev@320 117 this.remoteLibraries = new Map<LibraryKey, LibraryUri>([
dev@321 118 // ['nnls-chroma', 'assets/extractors/NNLSChroma.js'],
dev@320 119 ['pyin', 'assets/extractors/PYin.umd.js'],
dev@320 120 ]);
dev@250 121 this.service = new ThrottledReducingAggregateService();
dev@226 122 this.setupImportLibraryListener();
dev@226 123 this.server = new WebWorkerStreamingServer(
dev@226 124 this.workerScope,
dev@226 125 this.service
dev@72 126 );
dev@226 127 }
dev@72 128
dev@226 129 private setupImportLibraryListener(): void {
dev@229 130
dev@44 131 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@64 132 const sendResponse = (result) => {
dev@320 133 console.warn(ev.data.method);
dev@64 134 this.workerScope.postMessage({
dev@64 135 method: ev.data.method,
dev@64 136 result: result
dev@64 137 });
dev@64 138 };
dev@44 139 switch (ev.data.method) {
dev@72 140 case 'import':
dev@72 141 const key: LibraryKey = ev.data.params;
dev@72 142 if (this.remoteLibraries.has(key)) {
dev@72 143 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
dev@322 144
dev@322 145 const service = () => {
dev@322 146 // TODO a factory with more logic probably belongs in piper-js
dev@322 147 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@322 148 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@322 149 return new PiperStreamingService(
dev@322 150 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
dev@322 151 );
dev@322 152 };
dev@229 153 this.service.addService(key, service);
dev@226 154 this.service.list({}).then(sendResponse);
dev@72 155 });
dev@72 156 } else {
dev@72 157 console.error('Non registered library key.'); // TODO handle error
dev@72 158 }
dev@72 159 break;
dev@72 160 case 'addRemoteLibraries': // TODO rename
dev@72 161 const available: AvailableLibraries = ev.data.params;
dev@236 162 Object.keys(available).forEach(libraryKey => {
dev@236 163 this.remoteLibraries.set(libraryKey, available[libraryKey]);
dev@72 164 });
dev@44 165 }
dev@44 166 };
dev@40 167 }
dev@40 168 }