annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 323:72673c954216

Try chaining the list requests.
author Lucas Thompson <dev@lucas.im>
date Tue, 16 May 2017 11:15:43 +0100
parents 38886ce7e2e5
children e433a2da0ada
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@322 5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@72 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
dev@322 34 type Factory<T> = () => T;
dev@72 35
dev@226 36 class AggregateStreamingService implements StreamingService {
dev@322 37 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
dev@226 38
dev@226 39 constructor() {
dev@322 40 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
dev@322 41 this.services.set(
dev@322 42 'vamp-example-plugins',
dev@322 43 () => new PiperStreamingService(
dev@322 44 new PiperVampService(VampExamplePlugins())
dev@322 45 )
dev@322 46 );
dev@226 47 }
dev@226 48
dev@322 49 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
dev@226 50 this.services.set(key, service);
dev@226 51 }
dev@226 52
dev@226 53 list(request: ListRequest): Promise<ListResponse> {
dev@323 54 //TODO refactor
dev@323 55 const listThunks: (() => Promise<ListResponse>)[] = [
dev@323 56 ...this.services.values()
dev@323 57 ].map(client => () => client().list({}));
dev@323 58
dev@323 59 const concatAvailable = (running: ListResponse,
dev@323 60 nextResponse: Promise<ListResponse>)
dev@323 61 : Promise<ListResponse> => {
dev@323 62 return nextResponse.then(response => {
dev@323 63 running.available = running.available.concat(response.available);
dev@323 64 return running;
dev@323 65 });
dev@323 66 };
dev@323 67
dev@323 68 return listThunks.reduce((runningResponses, nextResponse) => {
dev@323 69 return runningResponses.then(response => {
dev@323 70 return concatAvailable(response, nextResponse());
dev@226 71 })
dev@323 72 }, Promise.resolve({available: []}));
dev@226 73 }
dev@226 74
dev@226 75 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 76 return this.dispatch('process', request);
dev@226 77 }
dev@226 78
dev@305 79 protected dispatch(method: 'process',
dev@248 80 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 81 const key = request.key.split(':')[0];
dev@322 82 return this.services.has(key) ? this.services.get(key)()[method](request) :
dev@322 83 Observable.throw('Invalid key');
dev@226 84 }
dev@226 85 }
dev@226 86
dev@250 87 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 88 constructor() {
dev@243 89 super();
dev@243 90 }
dev@243 91
dev@305 92 protected dispatch(method: 'process',
dev@248 93 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 94 let lastPercentagePoint = 0;
dev@305 95 let shouldClear = false;
dev@248 96 return super.dispatch(method, request)
dev@305 97 .scan((acc, value) => {
dev@305 98 if (shouldClear) {
dev@305 99 acc.features = [];
dev@305 100 }
dev@305 101 return streamingResponseReducer(acc, value);
dev@305 102 })
dev@243 103 .filter(val => {
dev@305 104 const progress = val.progress;
dev@243 105 const percentage =
dev@305 106 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 107 const pointDifference = (percentage - lastPercentagePoint);
dev@249 108 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 109 if (shouldEmit) {
dev@243 110 lastPercentagePoint = percentage;
dev@243 111 }
dev@305 112 shouldClear = shouldEmit;
dev@249 113 return shouldEmit;
dev@243 114 });
dev@243 115 }
dev@243 116 }
dev@243 117
dev@40 118 export default class FeatureExtractionWorker {
dev@226 119 private workerScope: DedicatedWorkerGlobalScope;
dev@72 120 private remoteLibraries: Map<LibraryKey, LibraryUri>;
dev@226 121 private server: WebWorkerStreamingServer;
dev@226 122 private service: AggregateStreamingService;
dev@40 123
dev@226 124 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 125 private requireJs: RequireJs) {
dev@40 126 this.workerScope = workerScope;
dev@320 127 this.remoteLibraries = new Map<LibraryKey, LibraryUri>([
dev@321 128 // ['nnls-chroma', 'assets/extractors/NNLSChroma.js'],
dev@320 129 ['pyin', 'assets/extractors/PYin.umd.js'],
dev@320 130 ]);
dev@250 131 this.service = new ThrottledReducingAggregateService();
dev@226 132 this.setupImportLibraryListener();
dev@226 133 this.server = new WebWorkerStreamingServer(
dev@226 134 this.workerScope,
dev@226 135 this.service
dev@72 136 );
dev@226 137 }
dev@72 138
dev@226 139 private setupImportLibraryListener(): void {
dev@229 140
dev@44 141 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@64 142 const sendResponse = (result) => {
dev@323 143 console.warn(ev.data.method, ev.data);
dev@64 144 this.workerScope.postMessage({
dev@64 145 method: ev.data.method,
dev@64 146 result: result
dev@64 147 });
dev@64 148 };
dev@44 149 switch (ev.data.method) {
dev@72 150 case 'import':
dev@72 151 const key: LibraryKey = ev.data.params;
dev@72 152 if (this.remoteLibraries.has(key)) {
dev@72 153 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
dev@322 154
dev@322 155 const service = () => {
dev@322 156 // TODO a factory with more logic probably belongs in piper-js
dev@322 157 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@322 158 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@322 159 return new PiperStreamingService(
dev@322 160 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
dev@322 161 );
dev@322 162 };
dev@229 163 this.service.addService(key, service);
dev@226 164 this.service.list({}).then(sendResponse);
dev@72 165 });
dev@72 166 } else {
dev@72 167 console.error('Non registered library key.'); // TODO handle error
dev@72 168 }
dev@72 169 break;
dev@72 170 case 'addRemoteLibraries': // TODO rename
dev@72 171 const available: AvailableLibraries = ev.data.params;
dev@236 172 Object.keys(available).forEach(libraryKey => {
dev@236 173 this.remoteLibraries.set(libraryKey, available[libraryKey]);
dev@72 174 });
dev@44 175 }
dev@44 176 };
dev@40 177 }
dev@40 178 }