dev@40
|
1 /**
|
dev@40
|
2 * Created by lucas on 01/12/2016.
|
dev@40
|
3 */
|
dev@40
|
4
|
dev@324
|
5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
|
dev@72
|
6 import {
|
dev@226
|
7 SimpleRequest
|
dev@72
|
8 } from 'piper/HigherLevelUtilities';
|
dev@44
|
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
|
dev@243
|
10 import {
|
dev@243
|
11 AvailableLibraries
|
dev@243
|
12 } from './feature-extraction.service';
|
dev@226
|
13 import {
|
dev@226
|
14 DedicatedWorkerGlobalScope,
|
dev@226
|
15 WebWorkerStreamingServer
|
dev@236
|
16 } from 'piper/servers/WebWorkerStreamingServer';
|
dev@226
|
17 import {
|
dev@226
|
18 PiperStreamingService,
|
dev@226
|
19 StreamingResponse,
|
dev@226
|
20 StreamingService
|
dev@236
|
21 } from 'piper/StreamingService';
|
dev@236
|
22 import {Observable} from 'rxjs/Observable';
|
dev@236
|
23 import {EmscriptenModule} from 'piper/PiperVampService';
|
dev@243
|
24 import {streamingResponseReducer} from './FeatureReducers';
|
dev@40
|
25
|
dev@40
|
26 interface MessageEvent {
|
dev@40
|
27 readonly data: any;
|
dev@40
|
28 }
|
dev@40
|
29
|
dev@72
|
30 type LibraryUri = string;
|
dev@72
|
31 type LibraryKey = string;
|
dev@72
|
32
|
dev@72
|
33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
|
dev@322
|
34 type Factory<T> = () => T;
|
dev@72
|
35
|
dev@324
|
36 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
|
dev@324
|
37 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
|
dev@324
|
38 return next.then(response => {
|
dev@324
|
39 running = running.concat(response);
|
dev@324
|
40 return running;
|
dev@324
|
41 });
|
dev@324
|
42 };
|
dev@324
|
43
|
dev@324
|
44 return tasks.reduce((runningResponses, nextResponse) => {
|
dev@324
|
45 return runningResponses.then(response => {
|
dev@324
|
46 return reducer(response, nextResponse());
|
dev@324
|
47 })
|
dev@324
|
48 }, Promise.resolve([]));
|
dev@324
|
49 }
|
dev@324
|
50
|
dev@226
|
51 class AggregateStreamingService implements StreamingService {
|
dev@322
|
52 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
|
dev@226
|
53
|
dev@226
|
54 constructor() {
|
dev@322
|
55 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
|
dev@322
|
56 this.services.set(
|
dev@322
|
57 'vamp-example-plugins',
|
dev@322
|
58 () => new PiperStreamingService(
|
dev@322
|
59 new PiperVampService(VampExamplePlugins())
|
dev@322
|
60 )
|
dev@322
|
61 );
|
dev@226
|
62 }
|
dev@226
|
63
|
dev@322
|
64 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
|
dev@226
|
65 this.services.set(key, service);
|
dev@226
|
66 }
|
dev@226
|
67
|
dev@324
|
68 hasRemoteService(key: LibraryKey): boolean {
|
dev@324
|
69 return this.services.has(key);
|
dev@324
|
70 }
|
dev@324
|
71
|
dev@226
|
72 list(request: ListRequest): Promise<ListResponse> {
|
dev@323
|
73 const listThunks: (() => Promise<ListResponse>)[] = [
|
dev@323
|
74 ...this.services.values()
|
dev@323
|
75 ].map(client => () => client().list({}));
|
dev@323
|
76
|
dev@324
|
77 return waterfall(listThunks).then(responses => {
|
dev@324
|
78 return responses.reduce((allAvailable, res) => {
|
dev@324
|
79 allAvailable.available = allAvailable.available.concat(res.available);
|
dev@324
|
80 return allAvailable;
|
dev@324
|
81 }, {available: []});
|
dev@324
|
82 })
|
dev@226
|
83 }
|
dev@226
|
84
|
dev@226
|
85 process(request: SimpleRequest): Observable<StreamingResponse> {
|
dev@248
|
86 return this.dispatch('process', request);
|
dev@226
|
87 }
|
dev@226
|
88
|
dev@305
|
89 protected dispatch(method: 'process',
|
dev@248
|
90 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@226
|
91 const key = request.key.split(':')[0];
|
dev@322
|
92 return this.services.has(key) ? this.services.get(key)()[method](request) :
|
dev@322
|
93 Observable.throw('Invalid key');
|
dev@226
|
94 }
|
dev@226
|
95 }
|
dev@226
|
96
|
dev@250
|
97 class ThrottledReducingAggregateService extends AggregateStreamingService {
|
dev@243
|
98 constructor() {
|
dev@243
|
99 super();
|
dev@243
|
100 }
|
dev@243
|
101
|
dev@305
|
102 protected dispatch(method: 'process',
|
dev@248
|
103 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@243
|
104 let lastPercentagePoint = 0;
|
dev@305
|
105 let shouldClear = false;
|
dev@248
|
106 return super.dispatch(method, request)
|
dev@305
|
107 .scan((acc, value) => {
|
dev@305
|
108 if (shouldClear) {
|
dev@305
|
109 acc.features = [];
|
dev@305
|
110 }
|
dev@305
|
111 return streamingResponseReducer(acc, value);
|
dev@305
|
112 })
|
dev@243
|
113 .filter(val => {
|
dev@305
|
114 const progress = val.progress;
|
dev@243
|
115 const percentage =
|
dev@305
|
116 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
|
dev@243
|
117 const pointDifference = (percentage - lastPercentagePoint);
|
dev@249
|
118 const shouldEmit = pointDifference === 1 || percentage === 100;
|
dev@249
|
119 if (shouldEmit) {
|
dev@243
|
120 lastPercentagePoint = percentage;
|
dev@243
|
121 }
|
dev@305
|
122 shouldClear = shouldEmit;
|
dev@249
|
123 return shouldEmit;
|
dev@243
|
124 });
|
dev@243
|
125 }
|
dev@243
|
126 }
|
dev@243
|
127
|
dev@40
|
128 export default class FeatureExtractionWorker {
|
dev@226
|
129 private workerScope: DedicatedWorkerGlobalScope;
|
dev@72
|
130 private remoteLibraries: Map<LibraryKey, LibraryUri>;
|
dev@226
|
131 private server: WebWorkerStreamingServer;
|
dev@226
|
132 private service: AggregateStreamingService;
|
dev@40
|
133
|
dev@226
|
134 constructor(workerScope: DedicatedWorkerGlobalScope,
|
dev@226
|
135 private requireJs: RequireJs) {
|
dev@40
|
136 this.workerScope = workerScope;
|
dev@324
|
137 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
|
dev@250
|
138 this.service = new ThrottledReducingAggregateService();
|
dev@226
|
139 this.setupImportLibraryListener();
|
dev@226
|
140 this.server = new WebWorkerStreamingServer(
|
dev@226
|
141 this.workerScope,
|
dev@226
|
142 this.service
|
dev@72
|
143 );
|
dev@226
|
144 }
|
dev@72
|
145
|
dev@226
|
146 private setupImportLibraryListener(): void {
|
dev@229
|
147
|
dev@44
|
148 this.workerScope.onmessage = (ev: MessageEvent) => {
|
dev@44
|
149 switch (ev.data.method) {
|
dev@72
|
150 case 'addRemoteLibraries': // TODO rename
|
dev@72
|
151 const available: AvailableLibraries = ev.data.params;
|
dev@324
|
152 const importThunks = Object.keys(available).map(libraryKey => {
|
dev@324
|
153 return () => {
|
dev@324
|
154 this.remoteLibraries.set(libraryKey, available[libraryKey]);
|
dev@324
|
155 return this.import(libraryKey).then(key => {
|
dev@324
|
156 return key;
|
dev@324
|
157 });
|
dev@324
|
158 };
|
dev@72
|
159 });
|
dev@324
|
160 waterfall(importThunks).then(() => {
|
dev@324
|
161 this.service.list({}).then(response => {
|
dev@324
|
162 this.workerScope.postMessage({
|
dev@324
|
163 method: 'import',
|
dev@324
|
164 result: response
|
dev@324
|
165 });
|
dev@324
|
166 });
|
dev@324
|
167 })
|
dev@44
|
168 }
|
dev@44
|
169 };
|
dev@40
|
170 }
|
dev@324
|
171
|
dev@324
|
172 private import(key: LibraryKey): Promise<LibraryKey> { // TODO return type?
|
dev@324
|
173 return new Promise((res, rej) => {
|
dev@324
|
174 if (this.remoteLibraries.has(key)) {
|
dev@324
|
175 // TODO RequireJs can fail... need to reject the promise then
|
dev@324
|
176 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
|
dev@324
|
177
|
dev@324
|
178 const service = () => {
|
dev@324
|
179 // TODO a factory with more logic probably belongs in piper-js
|
dev@324
|
180 const lib: any | EmscriptenModule = plugin.createLibrary();
|
dev@324
|
181 const isEmscriptenModule = typeof lib.cwrap === 'function';
|
dev@324
|
182 return new PiperStreamingService(
|
dev@324
|
183 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
|
dev@324
|
184 );
|
dev@324
|
185 };
|
dev@324
|
186 this.service.addService(key, service);
|
dev@324
|
187 res(key);
|
dev@324
|
188 });
|
dev@324
|
189 } else {
|
dev@324
|
190 rej('Invalid remote library key');
|
dev@324
|
191 }
|
dev@324
|
192 });
|
dev@324
|
193 }
|
dev@40
|
194 }
|