dev@40
|
1 /**
|
dev@40
|
2 * Created by lucas on 01/12/2016.
|
dev@40
|
3 */
|
dev@40
|
4
|
dev@497
|
5 import {ListRequest, ListResponse, Service} from 'piper-js/core';
|
dev@497
|
6 import {EmscriptenService as PiperVampService} from 'piper-js/emscripten';
|
dev@72
|
7 import {
|
dev@497
|
8 OneShotExtractionRequest as SimpleRequest,
|
dev@497
|
9 } from 'piper-js/one-shot';
|
dev@497
|
10 import { VampExamplePlugins } from 'piper-js/ext/VampExamplePluginsModule';
|
dev@243
|
11 import {
|
dev@243
|
12 AvailableLibraries
|
dev@243
|
13 } from './feature-extraction.service';
|
dev@226
|
14 import {
|
dev@226
|
15 DedicatedWorkerGlobalScope,
|
dev@226
|
16 WebWorkerStreamingServer
|
dev@497
|
17 } from 'piper-js/web-worker';
|
dev@226
|
18 import {
|
dev@226
|
19 PiperStreamingService,
|
dev@226
|
20 StreamingResponse,
|
dev@226
|
21 StreamingService
|
dev@497
|
22 } from 'piper-js/streaming';
|
dev@236
|
23 import {Observable} from 'rxjs/Observable';
|
dev@497
|
24 import {EmscriptenModule} from 'piper-js/emscripten';
|
dev@243
|
25 import {streamingResponseReducer} from './FeatureReducers';
|
dev@40
|
26
|
dev@40
|
27 interface MessageEvent {
|
dev@40
|
28 readonly data: any;
|
dev@40
|
29 }
|
dev@40
|
30
|
dev@72
|
31 type LibraryUri = string;
|
dev@72
|
32 type LibraryKey = string;
|
dev@72
|
33
|
dev@328
|
34 type RequireJs = (libs: string[],
|
dev@328
|
35 callback: (...libs: any[]) => void,
|
dev@328
|
36 errBack: (...failedLibIds: string[]) => void) => void;
|
dev@322
|
37 type Factory<T> = () => T;
|
dev@72
|
38
|
dev@324
|
39 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
|
dev@324
|
40 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
|
dev@324
|
41 return next.then(response => {
|
dev@324
|
42 running = running.concat(response);
|
dev@324
|
43 return running;
|
dev@324
|
44 });
|
dev@324
|
45 };
|
dev@324
|
46
|
dev@324
|
47 return tasks.reduce((runningResponses, nextResponse) => {
|
dev@324
|
48 return runningResponses.then(response => {
|
dev@449
|
49 try {
|
dev@449
|
50 return reducer(response, nextResponse());
|
dev@449
|
51 } catch (e) {
|
dev@449
|
52 throw new QueuedTaskFailure(runningResponses);
|
dev@449
|
53 }
|
dev@328
|
54 });
|
dev@324
|
55 }, Promise.resolve([]));
|
dev@324
|
56 }
|
dev@324
|
57
|
dev@449
|
58 class QueuedTaskFailure<T> extends Error {
|
dev@449
|
59 public previousResponses: Promise<T[]>;
|
dev@449
|
60
|
dev@449
|
61 constructor(previousResponses: Promise<T[]>, message?: string) {
|
dev@449
|
62 super(message || 'Queued task failed.');
|
dev@449
|
63 this.previousResponses = previousResponses;
|
dev@449
|
64 }
|
dev@449
|
65 }
|
dev@449
|
66
|
dev@449
|
67 function flattenListResponses(responses: ListResponse[]): ListResponse {
|
dev@449
|
68 return {
|
dev@449
|
69 available: responses.reduce(
|
dev@449
|
70 (flat, res) => flat.concat(res.available),
|
dev@449
|
71 []
|
dev@449
|
72 )
|
dev@449
|
73 };
|
dev@449
|
74 }
|
dev@449
|
75
|
dev@226
|
76 class AggregateStreamingService implements StreamingService {
|
dev@322
|
77 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
|
dev@226
|
78
|
dev@226
|
79 constructor() {
|
dev@322
|
80 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
|
dev@322
|
81 this.services.set(
|
dev@322
|
82 'vamp-example-plugins',
|
dev@322
|
83 () => new PiperStreamingService(
|
dev@322
|
84 new PiperVampService(VampExamplePlugins())
|
dev@322
|
85 )
|
dev@322
|
86 );
|
dev@226
|
87 }
|
dev@226
|
88
|
dev@322
|
89 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
|
dev@226
|
90 this.services.set(key, service);
|
dev@226
|
91 }
|
dev@226
|
92
|
dev@226
|
93 list(request: ListRequest): Promise<ListResponse> {
|
dev@323
|
94 const listThunks: (() => Promise<ListResponse>)[] = [
|
dev@323
|
95 ...this.services.values()
|
dev@328
|
96 ].map(createClient => () => createClient().list({}));
|
dev@449
|
97 return waterfall(listThunks)
|
dev@449
|
98 .then(flattenListResponses);
|
dev@226
|
99 }
|
dev@226
|
100
|
dev@226
|
101 process(request: SimpleRequest): Observable<StreamingResponse> {
|
dev@248
|
102 return this.dispatch('process', request);
|
dev@226
|
103 }
|
dev@226
|
104
|
dev@305
|
105 protected dispatch(method: 'process',
|
dev@248
|
106 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@226
|
107 const key = request.key.split(':')[0];
|
dev@322
|
108 return this.services.has(key) ? this.services.get(key)()[method](request) :
|
dev@322
|
109 Observable.throw('Invalid key');
|
dev@226
|
110 }
|
dev@226
|
111 }
|
dev@226
|
112
|
dev@250
|
113 class ThrottledReducingAggregateService extends AggregateStreamingService {
|
dev@243
|
114 constructor() {
|
dev@243
|
115 super();
|
dev@243
|
116 }
|
dev@243
|
117
|
dev@305
|
118 protected dispatch(method: 'process',
|
dev@248
|
119 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@243
|
120 let lastPercentagePoint = 0;
|
dev@305
|
121 let shouldClear = false;
|
dev@248
|
122 return super.dispatch(method, request)
|
dev@305
|
123 .scan((acc, value) => {
|
dev@305
|
124 if (shouldClear) {
|
dev@305
|
125 acc.features = [];
|
dev@305
|
126 }
|
dev@305
|
127 return streamingResponseReducer(acc, value);
|
dev@305
|
128 })
|
dev@243
|
129 .filter(val => {
|
dev@305
|
130 const progress = val.progress;
|
dev@243
|
131 const percentage =
|
dev@305
|
132 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
|
dev@243
|
133 const pointDifference = (percentage - lastPercentagePoint);
|
dev@249
|
134 const shouldEmit = pointDifference === 1 || percentage === 100;
|
dev@249
|
135 if (shouldEmit) {
|
dev@243
|
136 lastPercentagePoint = percentage;
|
dev@243
|
137 }
|
dev@305
|
138 shouldClear = shouldEmit;
|
dev@249
|
139 return shouldEmit;
|
dev@243
|
140 });
|
dev@243
|
141 }
|
dev@243
|
142 }
|
dev@243
|
143
|
dev@40
|
144 export default class FeatureExtractionWorker {
|
dev@226
|
145 private workerScope: DedicatedWorkerGlobalScope;
|
dev@226
|
146 private server: WebWorkerStreamingServer;
|
dev@226
|
147 private service: AggregateStreamingService;
|
dev@40
|
148
|
dev@226
|
149 constructor(workerScope: DedicatedWorkerGlobalScope,
|
dev@226
|
150 private requireJs: RequireJs) {
|
dev@40
|
151 this.workerScope = workerScope;
|
dev@250
|
152 this.service = new ThrottledReducingAggregateService();
|
dev@226
|
153 this.setupImportLibraryListener();
|
dev@226
|
154 this.server = new WebWorkerStreamingServer(
|
dev@226
|
155 this.workerScope,
|
dev@226
|
156 this.service
|
dev@72
|
157 );
|
dev@226
|
158 }
|
dev@72
|
159
|
dev@226
|
160 private setupImportLibraryListener(): void {
|
dev@229
|
161
|
dev@44
|
162 this.workerScope.onmessage = (ev: MessageEvent) => {
|
dev@44
|
163 switch (ev.data.method) {
|
dev@72
|
164 case 'addRemoteLibraries': // TODO rename
|
dev@72
|
165 const available: AvailableLibraries = ev.data.params;
|
dev@324
|
166 const importThunks = Object.keys(available).map(libraryKey => {
|
dev@324
|
167 return () => {
|
dev@328
|
168 return this.downloadRemoteLibrary(
|
dev@328
|
169 libraryKey,
|
dev@328
|
170 available[libraryKey]
|
dev@328
|
171 ).then(createService => {
|
dev@328
|
172 this.service.addService(libraryKey,
|
dev@328
|
173 () => new PiperStreamingService(
|
dev@328
|
174 createService()
|
dev@328
|
175 ));
|
dev@328
|
176 });
|
dev@324
|
177 };
|
dev@72
|
178 });
|
dev@328
|
179 waterfall(importThunks)
|
dev@328
|
180 .then(() => this.service.list({}))
|
dev@328
|
181 .then(response => {
|
dev@324
|
182 this.workerScope.postMessage({
|
dev@324
|
183 method: 'import',
|
dev@324
|
184 result: response
|
dev@324
|
185 });
|
dev@449
|
186 })
|
dev@449
|
187 .catch((e) => {
|
dev@449
|
188 console.warn(`${e.message}. Try using results so far`);
|
dev@449
|
189 e.previousResponses.then(responses => {
|
dev@449
|
190 this.workerScope.postMessage({
|
dev@449
|
191 method: 'import',
|
dev@449
|
192 result: flattenListResponses(responses)
|
dev@449
|
193 });
|
dev@449
|
194 });
|
dev@324
|
195 });
|
dev@44
|
196 }
|
dev@44
|
197 };
|
dev@40
|
198 }
|
dev@324
|
199
|
dev@328
|
200 private downloadRemoteLibrary(key: LibraryKey,
|
dev@328
|
201 uri: LibraryUri): Promise<Factory<Service>> {
|
dev@324
|
202 return new Promise((res, rej) => {
|
dev@449
|
203 this.requireJs([uri], (createModule) => {
|
dev@328
|
204 res(() => {
|
dev@328
|
205 // TODO a factory with more logic probably belongs in piper-js
|
dev@449
|
206 const lib: any | EmscriptenModule = createModule();
|
dev@328
|
207 const isEmscriptenModule = typeof lib.cwrap === 'function';
|
dev@328
|
208 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
|
dev@324
|
209 });
|
dev@328
|
210 }, (err) => {
|
dev@328
|
211 rej(`Failed to load ${key} remote module.`);
|
dev@328
|
212 });
|
dev@324
|
213 });
|
dev@324
|
214 }
|
dev@40
|
215 }
|