annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 509:041468f553e1 tip master

Merge pull request #57 from LucasThompson/fix/session-stack-max-call-stack Fix accidental recursion in PersistentStack
author Lucas Thompson <LucasThompson@users.noreply.github.com>
date Mon, 27 Nov 2017 11:04:30 +0000
parents c39df81c4dae
children
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@497 5 import {ListRequest, ListResponse, Service} from 'piper-js/core';
dev@497 6 import {EmscriptenService as PiperVampService} from 'piper-js/emscripten';
dev@72 7 import {
dev@497 8 OneShotExtractionRequest as SimpleRequest,
dev@497 9 } from 'piper-js/one-shot';
dev@497 10 import { VampExamplePlugins } from 'piper-js/ext/VampExamplePluginsModule';
dev@243 11 import {
dev@243 12 AvailableLibraries
dev@243 13 } from './feature-extraction.service';
dev@226 14 import {
dev@226 15 DedicatedWorkerGlobalScope,
dev@226 16 WebWorkerStreamingServer
dev@497 17 } from 'piper-js/web-worker';
dev@226 18 import {
dev@226 19 PiperStreamingService,
dev@226 20 StreamingResponse,
dev@226 21 StreamingService
dev@497 22 } from 'piper-js/streaming';
dev@236 23 import {Observable} from 'rxjs/Observable';
dev@497 24 import {EmscriptenModule} from 'piper-js/emscripten';
dev@243 25 import {streamingResponseReducer} from './FeatureReducers';
dev@40 26
dev@40 27 interface MessageEvent {
dev@40 28 readonly data: any;
dev@40 29 }
dev@40 30
dev@72 31 type LibraryUri = string;
dev@72 32 type LibraryKey = string;
dev@72 33
dev@328 34 type RequireJs = (libs: string[],
dev@328 35 callback: (...libs: any[]) => void,
dev@328 36 errBack: (...failedLibIds: string[]) => void) => void;
dev@322 37 type Factory<T> = () => T;
dev@72 38
dev@324 39 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
dev@324 40 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
dev@324 41 return next.then(response => {
dev@324 42 running = running.concat(response);
dev@324 43 return running;
dev@324 44 });
dev@324 45 };
dev@324 46
dev@324 47 return tasks.reduce((runningResponses, nextResponse) => {
dev@324 48 return runningResponses.then(response => {
dev@449 49 try {
dev@449 50 return reducer(response, nextResponse());
dev@449 51 } catch (e) {
dev@449 52 throw new QueuedTaskFailure(runningResponses);
dev@449 53 }
dev@328 54 });
dev@324 55 }, Promise.resolve([]));
dev@324 56 }
dev@324 57
dev@449 58 class QueuedTaskFailure<T> extends Error {
dev@449 59 public previousResponses: Promise<T[]>;
dev@449 60
dev@449 61 constructor(previousResponses: Promise<T[]>, message?: string) {
dev@449 62 super(message || 'Queued task failed.');
dev@449 63 this.previousResponses = previousResponses;
dev@449 64 }
dev@449 65 }
dev@449 66
dev@449 67 function flattenListResponses(responses: ListResponse[]): ListResponse {
dev@449 68 return {
dev@449 69 available: responses.reduce(
dev@449 70 (flat, res) => flat.concat(res.available),
dev@449 71 []
dev@449 72 )
dev@449 73 };
dev@449 74 }
dev@449 75
dev@226 76 class AggregateStreamingService implements StreamingService {
dev@322 77 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
dev@226 78
dev@226 79 constructor() {
dev@322 80 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
dev@322 81 this.services.set(
dev@322 82 'vamp-example-plugins',
dev@322 83 () => new PiperStreamingService(
dev@322 84 new PiperVampService(VampExamplePlugins())
dev@322 85 )
dev@322 86 );
dev@226 87 }
dev@226 88
dev@322 89 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
dev@226 90 this.services.set(key, service);
dev@226 91 }
dev@226 92
dev@226 93 list(request: ListRequest): Promise<ListResponse> {
dev@323 94 const listThunks: (() => Promise<ListResponse>)[] = [
dev@323 95 ...this.services.values()
dev@328 96 ].map(createClient => () => createClient().list({}));
dev@449 97 return waterfall(listThunks)
dev@449 98 .then(flattenListResponses);
dev@226 99 }
dev@226 100
dev@226 101 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 102 return this.dispatch('process', request);
dev@226 103 }
dev@226 104
dev@305 105 protected dispatch(method: 'process',
dev@248 106 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 107 const key = request.key.split(':')[0];
dev@322 108 return this.services.has(key) ? this.services.get(key)()[method](request) :
dev@322 109 Observable.throw('Invalid key');
dev@226 110 }
dev@226 111 }
dev@226 112
dev@250 113 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 114 constructor() {
dev@243 115 super();
dev@243 116 }
dev@243 117
dev@305 118 protected dispatch(method: 'process',
dev@248 119 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 120 let lastPercentagePoint = 0;
dev@305 121 let shouldClear = false;
dev@248 122 return super.dispatch(method, request)
dev@305 123 .scan((acc, value) => {
dev@305 124 if (shouldClear) {
dev@305 125 acc.features = [];
dev@305 126 }
dev@305 127 return streamingResponseReducer(acc, value);
dev@305 128 })
dev@243 129 .filter(val => {
dev@305 130 const progress = val.progress;
dev@243 131 const percentage =
dev@305 132 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 133 const pointDifference = (percentage - lastPercentagePoint);
dev@249 134 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 135 if (shouldEmit) {
dev@243 136 lastPercentagePoint = percentage;
dev@243 137 }
dev@305 138 shouldClear = shouldEmit;
dev@249 139 return shouldEmit;
dev@243 140 });
dev@243 141 }
dev@243 142 }
dev@243 143
dev@40 144 export default class FeatureExtractionWorker {
dev@226 145 private workerScope: DedicatedWorkerGlobalScope;
dev@226 146 private server: WebWorkerStreamingServer;
dev@226 147 private service: AggregateStreamingService;
dev@40 148
dev@226 149 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 150 private requireJs: RequireJs) {
dev@40 151 this.workerScope = workerScope;
dev@250 152 this.service = new ThrottledReducingAggregateService();
dev@226 153 this.setupImportLibraryListener();
dev@226 154 this.server = new WebWorkerStreamingServer(
dev@226 155 this.workerScope,
dev@226 156 this.service
dev@72 157 );
dev@226 158 }
dev@72 159
dev@226 160 private setupImportLibraryListener(): void {
dev@229 161
dev@44 162 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@44 163 switch (ev.data.method) {
dev@72 164 case 'addRemoteLibraries': // TODO rename
dev@72 165 const available: AvailableLibraries = ev.data.params;
dev@324 166 const importThunks = Object.keys(available).map(libraryKey => {
dev@324 167 return () => {
dev@328 168 return this.downloadRemoteLibrary(
dev@328 169 libraryKey,
dev@328 170 available[libraryKey]
dev@328 171 ).then(createService => {
dev@328 172 this.service.addService(libraryKey,
dev@328 173 () => new PiperStreamingService(
dev@328 174 createService()
dev@328 175 ));
dev@328 176 });
dev@324 177 };
dev@72 178 });
dev@328 179 waterfall(importThunks)
dev@328 180 .then(() => this.service.list({}))
dev@328 181 .then(response => {
dev@324 182 this.workerScope.postMessage({
dev@324 183 method: 'import',
dev@324 184 result: response
dev@324 185 });
dev@449 186 })
dev@449 187 .catch((e) => {
dev@449 188 console.warn(`${e.message}. Try using results so far`);
dev@449 189 e.previousResponses.then(responses => {
dev@449 190 this.workerScope.postMessage({
dev@449 191 method: 'import',
dev@449 192 result: flattenListResponses(responses)
dev@449 193 });
dev@449 194 });
dev@324 195 });
dev@44 196 }
dev@44 197 };
dev@40 198 }
dev@324 199
dev@328 200 private downloadRemoteLibrary(key: LibraryKey,
dev@328 201 uri: LibraryUri): Promise<Factory<Service>> {
dev@324 202 return new Promise((res, rej) => {
dev@449 203 this.requireJs([uri], (createModule) => {
dev@328 204 res(() => {
dev@328 205 // TODO a factory with more logic probably belongs in piper-js
dev@449 206 const lib: any | EmscriptenModule = createModule();
dev@328 207 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@328 208 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
dev@324 209 });
dev@328 210 }, (err) => {
dev@328 211 rej(`Failed to load ${key} remote module.`);
dev@328 212 });
dev@324 213 });
dev@324 214 }
dev@40 215 }