comparison src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 324:e433a2da0ada

Refactor the import library logic slightly to waterfall the loading of the libraries and list requests, and send one response when all libraries have been loaded.
author Lucas Thompson <dev@lucas.im>
date Tue, 16 May 2017 16:16:57 +0100
parents 72673c954216
children fab49fd10f35
comparison
equal deleted inserted replaced
323:72673c954216 324:e433a2da0ada
1 /** 1 /**
2 * Created by lucas on 01/12/2016. 2 * Created by lucas on 01/12/2016.
3 */ 3 */
4 4
5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper'; 5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
6 import { 6 import {
7 SimpleRequest 7 SimpleRequest
8 } from 'piper/HigherLevelUtilities'; 8 } from 'piper/HigherLevelUtilities';
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
10 import { 10 import {
31 type LibraryKey = string; 31 type LibraryKey = string;
32 32
33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void; 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
34 type Factory<T> = () => T; 34 type Factory<T> = () => T;
35 35
36 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
37 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
38 return next.then(response => {
39 running = running.concat(response);
40 return running;
41 });
42 };
43
44 return tasks.reduce((runningResponses, nextResponse) => {
45 return runningResponses.then(response => {
46 return reducer(response, nextResponse());
47 })
48 }, Promise.resolve([]));
49 }
50
36 class AggregateStreamingService implements StreamingService { 51 class AggregateStreamingService implements StreamingService {
37 private services: Map<LibraryKey, Factory<PiperStreamingService>>; 52 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
38 53
39 constructor() { 54 constructor() {
40 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>(); 55 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
48 63
49 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void { 64 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
50 this.services.set(key, service); 65 this.services.set(key, service);
51 } 66 }
52 67
68 hasRemoteService(key: LibraryKey): boolean {
69 return this.services.has(key);
70 }
71
53 list(request: ListRequest): Promise<ListResponse> { 72 list(request: ListRequest): Promise<ListResponse> {
54 //TODO refactor
55 const listThunks: (() => Promise<ListResponse>)[] = [ 73 const listThunks: (() => Promise<ListResponse>)[] = [
56 ...this.services.values() 74 ...this.services.values()
57 ].map(client => () => client().list({})); 75 ].map(client => () => client().list({}));
58 76
59 const concatAvailable = (running: ListResponse, 77 return waterfall(listThunks).then(responses => {
60 nextResponse: Promise<ListResponse>) 78 return responses.reduce((allAvailable, res) => {
61 : Promise<ListResponse> => { 79 allAvailable.available = allAvailable.available.concat(res.available);
62 return nextResponse.then(response => { 80 return allAvailable;
63 running.available = running.available.concat(response.available); 81 }, {available: []});
64 return running; 82 })
65 });
66 };
67
68 return listThunks.reduce((runningResponses, nextResponse) => {
69 return runningResponses.then(response => {
70 return concatAvailable(response, nextResponse());
71 })
72 }, Promise.resolve({available: []}));
73 } 83 }
74 84
75 process(request: SimpleRequest): Observable<StreamingResponse> { 85 process(request: SimpleRequest): Observable<StreamingResponse> {
76 return this.dispatch('process', request); 86 return this.dispatch('process', request);
77 } 87 }
122 private service: AggregateStreamingService; 132 private service: AggregateStreamingService;
123 133
124 constructor(workerScope: DedicatedWorkerGlobalScope, 134 constructor(workerScope: DedicatedWorkerGlobalScope,
125 private requireJs: RequireJs) { 135 private requireJs: RequireJs) {
126 this.workerScope = workerScope; 136 this.workerScope = workerScope;
127 this.remoteLibraries = new Map<LibraryKey, LibraryUri>([ 137 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
128 // ['nnls-chroma', 'assets/extractors/NNLSChroma.js'],
129 ['pyin', 'assets/extractors/PYin.umd.js'],
130 ]);
131 this.service = new ThrottledReducingAggregateService(); 138 this.service = new ThrottledReducingAggregateService();
132 this.setupImportLibraryListener(); 139 this.setupImportLibraryListener();
133 this.server = new WebWorkerStreamingServer( 140 this.server = new WebWorkerStreamingServer(
134 this.workerScope, 141 this.workerScope,
135 this.service 142 this.service
137 } 144 }
138 145
139 private setupImportLibraryListener(): void { 146 private setupImportLibraryListener(): void {
140 147
141 this.workerScope.onmessage = (ev: MessageEvent) => { 148 this.workerScope.onmessage = (ev: MessageEvent) => {
142 const sendResponse = (result) => {
143 console.warn(ev.data.method, ev.data);
144 this.workerScope.postMessage({
145 method: ev.data.method,
146 result: result
147 });
148 };
149 switch (ev.data.method) { 149 switch (ev.data.method) {
150 case 'import':
151 const key: LibraryKey = ev.data.params;
152 if (this.remoteLibraries.has(key)) {
153 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
154
155 const service = () => {
156 // TODO a factory with more logic probably belongs in piper-js
157 const lib: any | EmscriptenModule = plugin.createLibrary();
158 const isEmscriptenModule = typeof lib.cwrap === 'function';
159 return new PiperStreamingService(
160 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
161 );
162 };
163 this.service.addService(key, service);
164 this.service.list({}).then(sendResponse);
165 });
166 } else {
167 console.error('Non registered library key.'); // TODO handle error
168 }
169 break;
170 case 'addRemoteLibraries': // TODO rename 150 case 'addRemoteLibraries': // TODO rename
171 const available: AvailableLibraries = ev.data.params; 151 const available: AvailableLibraries = ev.data.params;
172 Object.keys(available).forEach(libraryKey => { 152 const importThunks = Object.keys(available).map(libraryKey => {
173 this.remoteLibraries.set(libraryKey, available[libraryKey]); 153 return () => {
154 this.remoteLibraries.set(libraryKey, available[libraryKey]);
155 return this.import(libraryKey).then(key => {
156 return key;
157 });
158 };
174 }); 159 });
160 waterfall(importThunks).then(() => {
161 this.service.list({}).then(response => {
162 this.workerScope.postMessage({
163 method: 'import',
164 result: response
165 });
166 });
167 })
175 } 168 }
176 }; 169 };
177 } 170 }
171
172 private import(key: LibraryKey): Promise<LibraryKey> { // TODO return type?
173 return new Promise((res, rej) => {
174 if (this.remoteLibraries.has(key)) {
175 // TODO RequireJs can fail... need to reject the promise then
176 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
177
178 const service = () => {
179 // TODO a factory with more logic probably belongs in piper-js
180 const lib: any | EmscriptenModule = plugin.createLibrary();
181 const isEmscriptenModule = typeof lib.cwrap === 'function';
182 return new PiperStreamingService(
183 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
184 );
185 };
186 this.service.addService(key, service);
187 res(key);
188 });
189 } else {
190 rej('Invalid remote library key');
191 }
192 });
193 }
178 } 194 }