dev@40
|
1 /**
|
dev@40
|
2 * Created by lucas on 01/12/2016.
|
dev@40
|
3 */
|
dev@40
|
4
|
dev@328
|
5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
|
dev@72
|
6 import {
|
dev@226
|
7 SimpleRequest
|
dev@72
|
8 } from 'piper/HigherLevelUtilities';
|
dev@44
|
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
|
dev@243
|
10 import {
|
dev@243
|
11 AvailableLibraries
|
dev@243
|
12 } from './feature-extraction.service';
|
dev@226
|
13 import {
|
dev@226
|
14 DedicatedWorkerGlobalScope,
|
dev@226
|
15 WebWorkerStreamingServer
|
dev@236
|
16 } from 'piper/servers/WebWorkerStreamingServer';
|
dev@226
|
17 import {
|
dev@226
|
18 PiperStreamingService,
|
dev@226
|
19 StreamingResponse,
|
dev@226
|
20 StreamingService
|
dev@236
|
21 } from 'piper/StreamingService';
|
dev@236
|
22 import {Observable} from 'rxjs/Observable';
|
dev@236
|
23 import {EmscriptenModule} from 'piper/PiperVampService';
|
dev@243
|
24 import {streamingResponseReducer} from './FeatureReducers';
|
dev@40
|
25
|
dev@40
|
26 interface MessageEvent {
|
dev@40
|
27 readonly data: any;
|
dev@40
|
28 }
|
dev@40
|
29
|
dev@72
|
30 type LibraryUri = string;
|
dev@72
|
31 type LibraryKey = string;
|
dev@72
|
32
|
dev@328
|
33 type RequireJs = (libs: string[],
|
dev@328
|
34 callback: (...libs: any[]) => void,
|
dev@328
|
35 errBack: (...failedLibIds: string[]) => void) => void;
|
dev@322
|
36 type Factory<T> = () => T;
|
dev@72
|
37
|
dev@324
|
38 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
|
dev@324
|
39 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
|
dev@324
|
40 return next.then(response => {
|
dev@324
|
41 running = running.concat(response);
|
dev@324
|
42 return running;
|
dev@324
|
43 });
|
dev@324
|
44 };
|
dev@324
|
45
|
dev@324
|
46 return tasks.reduce((runningResponses, nextResponse) => {
|
dev@324
|
47 return runningResponses.then(response => {
|
dev@324
|
48 return reducer(response, nextResponse());
|
dev@328
|
49 });
|
dev@324
|
50 }, Promise.resolve([]));
|
dev@324
|
51 }
|
dev@324
|
52
|
dev@226
|
53 class AggregateStreamingService implements StreamingService {
|
dev@322
|
54 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
|
dev@226
|
55
|
dev@226
|
56 constructor() {
|
dev@322
|
57 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
|
dev@322
|
58 this.services.set(
|
dev@322
|
59 'vamp-example-plugins',
|
dev@322
|
60 () => new PiperStreamingService(
|
dev@322
|
61 new PiperVampService(VampExamplePlugins())
|
dev@322
|
62 )
|
dev@322
|
63 );
|
dev@226
|
64 }
|
dev@226
|
65
|
dev@322
|
66 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
|
dev@226
|
67 this.services.set(key, service);
|
dev@226
|
68 }
|
dev@226
|
69
|
dev@226
|
70 list(request: ListRequest): Promise<ListResponse> {
|
dev@323
|
71 const listThunks: (() => Promise<ListResponse>)[] = [
|
dev@323
|
72 ...this.services.values()
|
dev@328
|
73 ].map(createClient => () => createClient().list({}));
|
dev@323
|
74
|
dev@328
|
75 return waterfall(listThunks).then(responses => ({
|
dev@328
|
76 available: responses.reduce((flat, res) => flat.concat(res.available), [])
|
dev@328
|
77 }));
|
dev@226
|
78 }
|
dev@226
|
79
|
dev@226
|
80 process(request: SimpleRequest): Observable<StreamingResponse> {
|
dev@248
|
81 return this.dispatch('process', request);
|
dev@226
|
82 }
|
dev@226
|
83
|
dev@305
|
84 protected dispatch(method: 'process',
|
dev@248
|
85 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@226
|
86 const key = request.key.split(':')[0];
|
dev@322
|
87 return this.services.has(key) ? this.services.get(key)()[method](request) :
|
dev@322
|
88 Observable.throw('Invalid key');
|
dev@226
|
89 }
|
dev@226
|
90 }
|
dev@226
|
91
|
dev@250
|
92 class ThrottledReducingAggregateService extends AggregateStreamingService {
|
dev@243
|
93 constructor() {
|
dev@243
|
94 super();
|
dev@243
|
95 }
|
dev@243
|
96
|
dev@305
|
97 protected dispatch(method: 'process',
|
dev@248
|
98 request: SimpleRequest): Observable<StreamingResponse> {
|
dev@243
|
99 let lastPercentagePoint = 0;
|
dev@305
|
100 let shouldClear = false;
|
dev@248
|
101 return super.dispatch(method, request)
|
dev@305
|
102 .scan((acc, value) => {
|
dev@305
|
103 if (shouldClear) {
|
dev@305
|
104 acc.features = [];
|
dev@305
|
105 }
|
dev@305
|
106 return streamingResponseReducer(acc, value);
|
dev@305
|
107 })
|
dev@243
|
108 .filter(val => {
|
dev@305
|
109 const progress = val.progress;
|
dev@243
|
110 const percentage =
|
dev@305
|
111 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
|
dev@243
|
112 const pointDifference = (percentage - lastPercentagePoint);
|
dev@249
|
113 const shouldEmit = pointDifference === 1 || percentage === 100;
|
dev@249
|
114 if (shouldEmit) {
|
dev@243
|
115 lastPercentagePoint = percentage;
|
dev@243
|
116 }
|
dev@305
|
117 shouldClear = shouldEmit;
|
dev@249
|
118 return shouldEmit;
|
dev@243
|
119 });
|
dev@243
|
120 }
|
dev@243
|
121 }
|
dev@243
|
122
|
dev@40
|
123 export default class FeatureExtractionWorker {
|
dev@226
|
124 private workerScope: DedicatedWorkerGlobalScope;
|
dev@226
|
125 private server: WebWorkerStreamingServer;
|
dev@226
|
126 private service: AggregateStreamingService;
|
dev@40
|
127
|
dev@226
|
128 constructor(workerScope: DedicatedWorkerGlobalScope,
|
dev@226
|
129 private requireJs: RequireJs) {
|
dev@40
|
130 this.workerScope = workerScope;
|
dev@250
|
131 this.service = new ThrottledReducingAggregateService();
|
dev@226
|
132 this.setupImportLibraryListener();
|
dev@226
|
133 this.server = new WebWorkerStreamingServer(
|
dev@226
|
134 this.workerScope,
|
dev@226
|
135 this.service
|
dev@72
|
136 );
|
dev@226
|
137 }
|
dev@72
|
138
|
dev@226
|
139 private setupImportLibraryListener(): void {
|
dev@229
|
140
|
dev@44
|
141 this.workerScope.onmessage = (ev: MessageEvent) => {
|
dev@44
|
142 switch (ev.data.method) {
|
dev@72
|
143 case 'addRemoteLibraries': // TODO rename
|
dev@72
|
144 const available: AvailableLibraries = ev.data.params;
|
dev@324
|
145 const importThunks = Object.keys(available).map(libraryKey => {
|
dev@324
|
146 return () => {
|
dev@328
|
147 return this.downloadRemoteLibrary(
|
dev@328
|
148 libraryKey,
|
dev@328
|
149 available[libraryKey]
|
dev@328
|
150 ).then(createService => {
|
dev@328
|
151 this.service.addService(libraryKey,
|
dev@328
|
152 () => new PiperStreamingService(
|
dev@328
|
153 createService()
|
dev@328
|
154 ));
|
dev@328
|
155 });
|
dev@324
|
156 };
|
dev@72
|
157 });
|
dev@328
|
158 waterfall(importThunks)
|
dev@328
|
159 .then(() => this.service.list({}))
|
dev@328
|
160 .then(response => {
|
dev@324
|
161 this.workerScope.postMessage({
|
dev@324
|
162 method: 'import',
|
dev@324
|
163 result: response
|
dev@324
|
164 });
|
dev@324
|
165 });
|
dev@44
|
166 }
|
dev@44
|
167 };
|
dev@40
|
168 }
|
dev@324
|
169
|
dev@328
|
170 private downloadRemoteLibrary(key: LibraryKey,
|
dev@328
|
171 uri: LibraryUri): Promise<Factory<Service>> {
|
dev@324
|
172 return new Promise((res, rej) => {
|
dev@328
|
173 this.requireJs([uri], (plugin) => {
|
dev@328
|
174 res(() => {
|
dev@328
|
175 // TODO a factory with more logic probably belongs in piper-js
|
dev@328
|
176 const lib: any | EmscriptenModule = plugin.createLibrary();
|
dev@328
|
177 const isEmscriptenModule = typeof lib.cwrap === 'function';
|
dev@328
|
178 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
|
dev@324
|
179 });
|
dev@328
|
180 }, (err) => {
|
dev@328
|
181 rej(`Failed to load ${key} remote module.`);
|
dev@328
|
182 });
|
dev@324
|
183 });
|
dev@324
|
184 }
|
dev@40
|
185 }
|