annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 324:e433a2da0ada

Refactor the import library logic slightly to waterfall the loading of the libraries and list requests, and send one response when all libraries have been loaded.
author Lucas Thompson <dev@lucas.im>
date Tue, 16 May 2017 16:16:57 +0100
parents 72673c954216
children fab49fd10f35
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@324 5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@72 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
dev@322 34 type Factory<T> = () => T;
dev@72 35
dev@324 36 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
dev@324 37 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
dev@324 38 return next.then(response => {
dev@324 39 running = running.concat(response);
dev@324 40 return running;
dev@324 41 });
dev@324 42 };
dev@324 43
dev@324 44 return tasks.reduce((runningResponses, nextResponse) => {
dev@324 45 return runningResponses.then(response => {
dev@324 46 return reducer(response, nextResponse());
dev@324 47 })
dev@324 48 }, Promise.resolve([]));
dev@324 49 }
dev@324 50
dev@226 51 class AggregateStreamingService implements StreamingService {
dev@322 52 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
dev@226 53
dev@226 54 constructor() {
dev@322 55 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
dev@322 56 this.services.set(
dev@322 57 'vamp-example-plugins',
dev@322 58 () => new PiperStreamingService(
dev@322 59 new PiperVampService(VampExamplePlugins())
dev@322 60 )
dev@322 61 );
dev@226 62 }
dev@226 63
dev@322 64 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
dev@226 65 this.services.set(key, service);
dev@226 66 }
dev@226 67
dev@324 68 hasRemoteService(key: LibraryKey): boolean {
dev@324 69 return this.services.has(key);
dev@324 70 }
dev@324 71
dev@226 72 list(request: ListRequest): Promise<ListResponse> {
dev@323 73 const listThunks: (() => Promise<ListResponse>)[] = [
dev@323 74 ...this.services.values()
dev@323 75 ].map(client => () => client().list({}));
dev@323 76
dev@324 77 return waterfall(listThunks).then(responses => {
dev@324 78 return responses.reduce((allAvailable, res) => {
dev@324 79 allAvailable.available = allAvailable.available.concat(res.available);
dev@324 80 return allAvailable;
dev@324 81 }, {available: []});
dev@324 82 })
dev@226 83 }
dev@226 84
dev@226 85 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 86 return this.dispatch('process', request);
dev@226 87 }
dev@226 88
dev@305 89 protected dispatch(method: 'process',
dev@248 90 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 91 const key = request.key.split(':')[0];
dev@322 92 return this.services.has(key) ? this.services.get(key)()[method](request) :
dev@322 93 Observable.throw('Invalid key');
dev@226 94 }
dev@226 95 }
dev@226 96
dev@250 97 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 98 constructor() {
dev@243 99 super();
dev@243 100 }
dev@243 101
dev@305 102 protected dispatch(method: 'process',
dev@248 103 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 104 let lastPercentagePoint = 0;
dev@305 105 let shouldClear = false;
dev@248 106 return super.dispatch(method, request)
dev@305 107 .scan((acc, value) => {
dev@305 108 if (shouldClear) {
dev@305 109 acc.features = [];
dev@305 110 }
dev@305 111 return streamingResponseReducer(acc, value);
dev@305 112 })
dev@243 113 .filter(val => {
dev@305 114 const progress = val.progress;
dev@243 115 const percentage =
dev@305 116 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 117 const pointDifference = (percentage - lastPercentagePoint);
dev@249 118 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 119 if (shouldEmit) {
dev@243 120 lastPercentagePoint = percentage;
dev@243 121 }
dev@305 122 shouldClear = shouldEmit;
dev@249 123 return shouldEmit;
dev@243 124 });
dev@243 125 }
dev@243 126 }
dev@243 127
dev@40 128 export default class FeatureExtractionWorker {
dev@226 129 private workerScope: DedicatedWorkerGlobalScope;
dev@72 130 private remoteLibraries: Map<LibraryKey, LibraryUri>;
dev@226 131 private server: WebWorkerStreamingServer;
dev@226 132 private service: AggregateStreamingService;
dev@40 133
dev@226 134 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 135 private requireJs: RequireJs) {
dev@40 136 this.workerScope = workerScope;
dev@324 137 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
dev@250 138 this.service = new ThrottledReducingAggregateService();
dev@226 139 this.setupImportLibraryListener();
dev@226 140 this.server = new WebWorkerStreamingServer(
dev@226 141 this.workerScope,
dev@226 142 this.service
dev@72 143 );
dev@226 144 }
dev@72 145
dev@226 146 private setupImportLibraryListener(): void {
dev@229 147
dev@44 148 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@44 149 switch (ev.data.method) {
dev@72 150 case 'addRemoteLibraries': // TODO rename
dev@72 151 const available: AvailableLibraries = ev.data.params;
dev@324 152 const importThunks = Object.keys(available).map(libraryKey => {
dev@324 153 return () => {
dev@324 154 this.remoteLibraries.set(libraryKey, available[libraryKey]);
dev@324 155 return this.import(libraryKey).then(key => {
dev@324 156 return key;
dev@324 157 });
dev@324 158 };
dev@72 159 });
dev@324 160 waterfall(importThunks).then(() => {
dev@324 161 this.service.list({}).then(response => {
dev@324 162 this.workerScope.postMessage({
dev@324 163 method: 'import',
dev@324 164 result: response
dev@324 165 });
dev@324 166 });
dev@324 167 })
dev@44 168 }
dev@44 169 };
dev@40 170 }
dev@324 171
dev@324 172 private import(key: LibraryKey): Promise<LibraryKey> { // TODO return type?
dev@324 173 return new Promise((res, rej) => {
dev@324 174 if (this.remoteLibraries.has(key)) {
dev@324 175 // TODO RequireJs can fail... need to reject the promise then
dev@324 176 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
dev@324 177
dev@324 178 const service = () => {
dev@324 179 // TODO a factory with more logic probably belongs in piper-js
dev@324 180 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@324 181 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@324 182 return new PiperStreamingService(
dev@324 183 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
dev@324 184 );
dev@324 185 };
dev@324 186 this.service.addService(key, service);
dev@324 187 res(key);
dev@324 188 });
dev@324 189 } else {
dev@324 190 rej('Invalid remote library key');
dev@324 191 }
dev@324 192 });
dev@324 193 }
dev@40 194 }