dev@40
|
1 /**
|
dev@40
|
2 * Created by lucas on 01/12/2016.
|
dev@40
|
3 */
|
dev@40
|
4
|
dev@328
|
5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
|
dev@72
|
6 import {
|
dev@226
|
7 SimpleRequest
|
dev@72
|
8 } from 'piper/HigherLevelUtilities';
|
dev@44
|
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
|
dev@243
|
10 import {
|
dev@243
|
11 AvailableLibraries
|
dev@243
|
12 } from './feature-extraction.service';
|
dev@226
|
13 import {
|
dev@226
|
14 DedicatedWorkerGlobalScope,
|
dev@226
|
15 WebWorkerStreamingServer
|
dev@236
|
16 } from 'piper/servers/WebWorkerStreamingServer';
|
dev@226
|
17 import {
|
dev@226
|
18 PiperStreamingService,
|
dev@226
|
19 StreamingResponse,
|
dev@226
|
20 StreamingService
|
dev@236
|
21 } from 'piper/StreamingService';
|
dev@236
|
22 import {Observable} from 'rxjs/Observable';
|
dev@236
|
23 import {EmscriptenModule} from 'piper/PiperVampService';
|
dev@243
|
24 import {streamingResponseReducer} from './FeatureReducers';
|
dev@40
|
25
|
dev@40
|
26 interface MessageEvent {
|
dev@40
|
27 readonly data: any;
|
dev@40
|
28 }
|
dev@40
|
29
|
dev@72
|
30 type LibraryUri = string;
|
dev@72
|
31 type LibraryKey = string;
|
dev@72
|
32
|
dev@328
|
33 type RequireJs = (libs: string[],
|
dev@328
|
34 callback: (...libs: any[]) => void,
|
dev@328
|
35 errBack: (...failedLibIds: string[]) => void) => void;
|
dev@322
|
36 type Factory<T> = () => T;
|
dev@72
|
37
|
dev@324
|
38 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
|
dev@324
|
39 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
|
dev@324
|
40 return next.then(response => {
|
dev@324
|
41 running = running.concat(response);
|
dev@324
|
42 return running;
|
dev@324
|
43 });
|
dev@324
|
44 };
|
dev@324
|
45
|
dev@324
|
46 return tasks.reduce((runningResponses, nextResponse) => {
|
dev@324
|
47 return runningResponses.then(response => {
|
dev@449
|
48 try {
|
dev@449
|
49 return reducer(response, nextResponse());
|
dev@449
|
50 } catch (e) {
|
dev@449
|
51 throw new QueuedTaskFailure(runningResponses);
|
dev@449
|
52 }
|
dev@328
|
53 });
|
dev@324
|
54 }, Promise.resolve([]));
|
dev@324
|
55 }
|
dev@324
|
56
|
dev@449
|
57 class QueuedTaskFailure<T> extends Error {
|
dev@449
|
58 public previousResponses: Promise<T[]>;
|
dev@449
|
59
|
dev@449
|
60 constructor(previousResponses: Promise<T[]>, message?: string) {
|
dev@449
|
61 super(message || 'Queued task failed.');
|
dev@449
|
62 this.previousResponses = previousResponses;
|
dev@449
|
63 }
|
dev@449
|
64 }
|
dev@449
|
65
|
dev@449
|
66 function flattenListResponses(responses: ListResponse[]): ListResponse {
|
dev@449
|
67 return {
|
dev@449
|
68 available: responses.reduce(
|
dev@449
|
69 (flat, res) => flat.concat(res.available),
|
dev@449
|
70 []
|
dev@449
|
71 )
|
dev@449
|
72 };
|
dev@449
|
73 }
|
dev@449
|
74
|
dev@226
|
75 class AggregateStreamingService implements StreamingService {
|
dev@322
|
76 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
|
dev@226
|
77
|
dev@226
|
78 constructor() {
|
dev@322
|
79 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
|
dev@322
|
80 this.services.set(
|
dev@322
|
81 'vamp-example-plugins',
|
dev@322
|
82 () => new PiperStreamingService(
|
dev@322
|
83 new PiperVampService(VampExamplePlugins())
|
dev@322
|
84 )
|
dev@322
|
85 );
|
dev@226
|
86 }
|
dev@226
|
87
|
dev@322
|
88 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
|
dev@226
|
89 this.services.set(key, service);
|
dev@226
|
90 }
|
dev@226
|
91
|
dev@226
|
92 list(request: ListRequest): Promise<ListResponse> {
|
dev@323
|
93 const listThunks: (() => Promise<ListResponse>)[] = [
|
dev@323
|
94 ...this.services.values()
|
dev@328
|
95 ].map(createClient => () => createClient().list({}));
|
dev@449
|
96 return waterfall(listThunks)
|
dev@449
|
97 .then(flattenListResponses);
|
dev@226
|
98 }
|
dev@226
|
99
|
dev@226
|
100 process(request: SimpleRequest): Observable<StreamingResponse> {
|
dev@248
|
101 return this.dispatch('process', request);
|
dev@226
|
102 }
|
dev@226
|
103
|
dev@305
|
104 protected dispatch(method: 'process',
|
dev@248
|
105 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@226
|
106 const key = request.key.split(':')[0];
|
dev@322
|
107 return this.services.has(key) ? this.services.get(key)()[method](request) :
|
dev@322
|
108 Observable.throw('Invalid key');
|
dev@226
|
109 }
|
dev@226
|
110 }
|
dev@226
|
111
|
dev@250
|
112 class ThrottledReducingAggregateService extends AggregateStreamingService {
|
dev@243
|
113 constructor() {
|
dev@243
|
114 super();
|
dev@243
|
115 }
|
dev@243
|
116
|
dev@305
|
117 protected dispatch(method: 'process',
|
dev@248
|
118 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@243
|
119 let lastPercentagePoint = 0;
|
dev@305
|
120 let shouldClear = false;
|
dev@248
|
121 return super.dispatch(method, request)
|
dev@305
|
122 .scan((acc, value) => {
|
dev@305
|
123 if (shouldClear) {
|
dev@305
|
124 acc.features = [];
|
dev@305
|
125 }
|
dev@305
|
126 return streamingResponseReducer(acc, value);
|
dev@305
|
127 })
|
dev@243
|
128 .filter(val => {
|
dev@305
|
129 const progress = val.progress;
|
dev@243
|
130 const percentage =
|
dev@305
|
131 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
|
dev@243
|
132 const pointDifference = (percentage - lastPercentagePoint);
|
dev@249
|
133 const shouldEmit = pointDifference === 1 || percentage === 100;
|
dev@249
|
134 if (shouldEmit) {
|
dev@243
|
135 lastPercentagePoint = percentage;
|
dev@243
|
136 }
|
dev@305
|
137 shouldClear = shouldEmit;
|
dev@249
|
138 return shouldEmit;
|
dev@243
|
139 });
|
dev@243
|
140 }
|
dev@243
|
141 }
|
dev@243
|
142
|
dev@40
|
143 export default class FeatureExtractionWorker {
|
dev@226
|
144 private workerScope: DedicatedWorkerGlobalScope;
|
dev@226
|
145 private server: WebWorkerStreamingServer;
|
dev@226
|
146 private service: AggregateStreamingService;
|
dev@40
|
147
|
dev@226
|
148 constructor(workerScope: DedicatedWorkerGlobalScope,
|
dev@226
|
149 private requireJs: RequireJs) {
|
dev@40
|
150 this.workerScope = workerScope;
|
dev@250
|
151 this.service = new ThrottledReducingAggregateService();
|
dev@226
|
152 this.setupImportLibraryListener();
|
dev@226
|
153 this.server = new WebWorkerStreamingServer(
|
dev@226
|
154 this.workerScope,
|
dev@226
|
155 this.service
|
dev@72
|
156 );
|
dev@226
|
157 }
|
dev@72
|
158
|
dev@226
|
159 private setupImportLibraryListener(): void {
|
dev@229
|
160
|
dev@44
|
161 this.workerScope.onmessage = (ev: MessageEvent) => {
|
dev@44
|
162 switch (ev.data.method) {
|
dev@72
|
163 case 'addRemoteLibraries': // TODO rename
|
dev@72
|
164 const available: AvailableLibraries = ev.data.params;
|
dev@324
|
165 const importThunks = Object.keys(available).map(libraryKey => {
|
dev@324
|
166 return () => {
|
dev@328
|
167 return this.downloadRemoteLibrary(
|
dev@328
|
168 libraryKey,
|
dev@328
|
169 available[libraryKey]
|
dev@328
|
170 ).then(createService => {
|
dev@328
|
171 this.service.addService(libraryKey,
|
dev@328
|
172 () => new PiperStreamingService(
|
dev@328
|
173 createService()
|
dev@328
|
174 ));
|
dev@328
|
175 });
|
dev@324
|
176 };
|
dev@72
|
177 });
|
dev@328
|
178 waterfall(importThunks)
|
dev@328
|
179 .then(() => this.service.list({}))
|
dev@328
|
180 .then(response => {
|
dev@324
|
181 this.workerScope.postMessage({
|
dev@324
|
182 method: 'import',
|
dev@324
|
183 result: response
|
dev@324
|
184 });
|
dev@449
|
185 })
|
dev@449
|
186 .catch((e) => {
|
dev@449
|
187 console.warn(`${e.message}. Try using results so far`);
|
dev@449
|
188 e.previousResponses.then(responses => {
|
dev@449
|
189 this.workerScope.postMessage({
|
dev@449
|
190 method: 'import',
|
dev@449
|
191 result: flattenListResponses(responses)
|
dev@449
|
192 });
|
dev@449
|
193 });
|
dev@324
|
194 });
|
dev@44
|
195 }
|
dev@44
|
196 };
|
dev@40
|
197 }
|
dev@324
|
198
|
dev@328
|
199 private downloadRemoteLibrary(key: LibraryKey,
|
dev@328
|
200 uri: LibraryUri): Promise<Factory<Service>> {
|
dev@324
|
201 return new Promise((res, rej) => {
|
dev@449
|
202 this.requireJs([uri], (createModule) => {
|
dev@328
|
203 res(() => {
|
dev@328
|
204 // TODO a factory with more logic probably belongs in piper-js
|
dev@449
|
205 const lib: any | EmscriptenModule = createModule();
|
dev@328
|
206 const isEmscriptenModule = typeof lib.cwrap === 'function';
|
dev@328
|
207 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
|
dev@324
|
208 });
|
dev@328
|
209 }, (err) => {
|
dev@328
|
210 rej(`Failed to load ${key} remote module.`);
|
dev@328
|
211 });
|
dev@324
|
212 });
|
dev@324
|
213 }
|
dev@40
|
214 }
|