comparison src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 328:676c4d6d35f7

Small tidy up, some formatting changes and removal of unnecessary properties.
author Lucas Thompson <dev@lucas.im>
date Wed, 17 May 2017 12:56:39 +0100
parents fab49fd10f35
children dc7237d84f8d
comparison
equal deleted inserted replaced
327:fab49fd10f35 328:676c4d6d35f7
1 /** 1 /**
2 * Created by lucas on 01/12/2016. 2 * Created by lucas on 01/12/2016.
3 */ 3 */
4 4
5 import {PiperVampService, ListRequest, ListResponse} from 'piper'; 5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
6 import { 6 import {
7 SimpleRequest 7 SimpleRequest
8 } from 'piper/HigherLevelUtilities'; 8 } from 'piper/HigherLevelUtilities';
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
10 import { 10 import {
28 } 28 }
29 29
30 type LibraryUri = string; 30 type LibraryUri = string;
31 type LibraryKey = string; 31 type LibraryKey = string;
32 32
33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void; 33 type RequireJs = (libs: string[],
34 callback: (...libs: any[]) => void,
35 errBack: (...failedLibIds: string[]) => void) => void;
34 type Factory<T> = () => T; 36 type Factory<T> = () => T;
35 37
36 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> { 38 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
37 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => { 39 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
38 return next.then(response => { 40 return next.then(response => {
42 }; 44 };
43 45
44 return tasks.reduce((runningResponses, nextResponse) => { 46 return tasks.reduce((runningResponses, nextResponse) => {
45 return runningResponses.then(response => { 47 return runningResponses.then(response => {
46 return reducer(response, nextResponse()); 48 return reducer(response, nextResponse());
47 }) 49 });
48 }, Promise.resolve([])); 50 }, Promise.resolve([]));
49 } 51 }
50 52
51 class AggregateStreamingService implements StreamingService { 53 class AggregateStreamingService implements StreamingService {
52 private services: Map<LibraryKey, Factory<PiperStreamingService>>; 54 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
63 65
64 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void { 66 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
65 this.services.set(key, service); 67 this.services.set(key, service);
66 } 68 }
67 69
68 hasRemoteService(key: LibraryKey): boolean {
69 return this.services.has(key);
70 }
71
72 list(request: ListRequest): Promise<ListResponse> { 70 list(request: ListRequest): Promise<ListResponse> {
73 const listThunks: (() => Promise<ListResponse>)[] = [ 71 const listThunks: (() => Promise<ListResponse>)[] = [
74 ...this.services.values() 72 ...this.services.values()
75 ].map(client => () => client().list({})); 73 ].map(createClient => () => createClient().list({}));
76 74
77 return waterfall(listThunks).then(responses => { 75 return waterfall(listThunks).then(responses => ({
78 return responses.reduce((allAvailable, res) => { 76 available: responses.reduce((flat, res) => flat.concat(res.available), [])
79 allAvailable.available = allAvailable.available.concat(res.available); 77 }));
80 return allAvailable;
81 }, {available: []});
82 })
83 } 78 }
84 79
85 process(request: SimpleRequest): Observable<StreamingResponse> { 80 process(request: SimpleRequest): Observable<StreamingResponse> {
86 return this.dispatch('process', request); 81 return this.dispatch('process', request);
87 } 82 }
125 } 120 }
126 } 121 }
127 122
128 export default class FeatureExtractionWorker { 123 export default class FeatureExtractionWorker {
129 private workerScope: DedicatedWorkerGlobalScope; 124 private workerScope: DedicatedWorkerGlobalScope;
130 private remoteLibraries: Map<LibraryKey, LibraryUri>;
131 private server: WebWorkerStreamingServer; 125 private server: WebWorkerStreamingServer;
132 private service: AggregateStreamingService; 126 private service: AggregateStreamingService;
133 127
134 constructor(workerScope: DedicatedWorkerGlobalScope, 128 constructor(workerScope: DedicatedWorkerGlobalScope,
135 private requireJs: RequireJs) { 129 private requireJs: RequireJs) {
136 this.workerScope = workerScope; 130 this.workerScope = workerScope;
137 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
138 this.service = new ThrottledReducingAggregateService(); 131 this.service = new ThrottledReducingAggregateService();
139 this.setupImportLibraryListener(); 132 this.setupImportLibraryListener();
140 this.server = new WebWorkerStreamingServer( 133 this.server = new WebWorkerStreamingServer(
141 this.workerScope, 134 this.workerScope,
142 this.service 135 this.service
149 switch (ev.data.method) { 142 switch (ev.data.method) {
150 case 'addRemoteLibraries': // TODO rename 143 case 'addRemoteLibraries': // TODO rename
151 const available: AvailableLibraries = ev.data.params; 144 const available: AvailableLibraries = ev.data.params;
152 const importThunks = Object.keys(available).map(libraryKey => { 145 const importThunks = Object.keys(available).map(libraryKey => {
153 return () => { 146 return () => {
154 this.remoteLibraries.set(libraryKey, available[libraryKey]); 147 return this.downloadRemoteLibrary(
155 return this.import(libraryKey); 148 libraryKey,
149 available[libraryKey]
150 ).then(createService => {
151 this.service.addService(libraryKey,
152 () => new PiperStreamingService(
153 createService()
154 ));
155 });
156 }; 156 };
157 }); 157 });
158 waterfall(importThunks).then(() => { 158 waterfall(importThunks)
159 this.service.list({}).then(response => { 159 .then(() => this.service.list({}))
160 .then(response => {
160 this.workerScope.postMessage({ 161 this.workerScope.postMessage({
161 method: 'import', 162 method: 'import',
162 result: response 163 result: response
163 }); 164 });
164 }); 165 });
165 })
166 } 166 }
167 }; 167 };
168 } 168 }
169 169
170 private import(key: LibraryKey): Promise<LibraryKey> { // TODO return type? 170 private downloadRemoteLibrary(key: LibraryKey,
171 uri: LibraryUri): Promise<Factory<Service>> {
171 return new Promise((res, rej) => { 172 return new Promise((res, rej) => {
172 if (this.remoteLibraries.has(key)) { 173 this.requireJs([uri], (plugin) => {
173 // TODO RequireJs can fail... need to reject the promise then 174 res(() => {
174 this.requireJs([this.remoteLibraries.get(key)], (plugin) => { 175 // TODO a factory with more logic probably belongs in piper-js
175 176 const lib: any | EmscriptenModule = plugin.createLibrary();
176 const service = () => { 177 const isEmscriptenModule = typeof lib.cwrap === 'function';
177 // TODO a factory with more logic probably belongs in piper-js 178 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
178 const lib: any | EmscriptenModule = plugin.createLibrary();
179 const isEmscriptenModule = typeof lib.cwrap === 'function';
180 return new PiperStreamingService(
181 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
182 );
183 };
184 this.service.addService(key, service);
185 res(key);
186 }); 179 });
187 } else { 180 }, (err) => {
188 rej('Invalid remote library key'); 181 rej(`Failed to load ${key} remote module.`);
189 } 182 });
190 }); 183 });
191 } 184 }
192 } 185 }