annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 328:676c4d6d35f7

Small tidy up, some formatting changes and removal of unnecessary properties.
author Lucas Thompson <dev@lucas.im>
date Wed, 17 May 2017 12:56:39 +0100
parents fab49fd10f35
children dc7237d84f8d
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@328 5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@328 33 type RequireJs = (libs: string[],
dev@328 34 callback: (...libs: any[]) => void,
dev@328 35 errBack: (...failedLibIds: string[]) => void) => void;
dev@322 36 type Factory<T> = () => T;
dev@72 37
dev@324 38 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
dev@324 39 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
dev@324 40 return next.then(response => {
dev@324 41 running = running.concat(response);
dev@324 42 return running;
dev@324 43 });
dev@324 44 };
dev@324 45
dev@324 46 return tasks.reduce((runningResponses, nextResponse) => {
dev@324 47 return runningResponses.then(response => {
dev@324 48 return reducer(response, nextResponse());
dev@328 49 });
dev@324 50 }, Promise.resolve([]));
dev@324 51 }
dev@324 52
dev@226 53 class AggregateStreamingService implements StreamingService {
dev@322 54 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
dev@226 55
dev@226 56 constructor() {
dev@322 57 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
dev@322 58 this.services.set(
dev@322 59 'vamp-example-plugins',
dev@322 60 () => new PiperStreamingService(
dev@322 61 new PiperVampService(VampExamplePlugins())
dev@322 62 )
dev@322 63 );
dev@226 64 }
dev@226 65
dev@322 66 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
dev@226 67 this.services.set(key, service);
dev@226 68 }
dev@226 69
dev@226 70 list(request: ListRequest): Promise<ListResponse> {
dev@323 71 const listThunks: (() => Promise<ListResponse>)[] = [
dev@323 72 ...this.services.values()
dev@328 73 ].map(createClient => () => createClient().list({}));
dev@323 74
dev@328 75 return waterfall(listThunks).then(responses => ({
dev@328 76 available: responses.reduce((flat, res) => flat.concat(res.available), [])
dev@328 77 }));
dev@226 78 }
dev@226 79
dev@226 80 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 81 return this.dispatch('process', request);
dev@226 82 }
dev@226 83
dev@305 84 protected dispatch(method: 'process',
dev@248 85 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 86 const key = request.key.split(':')[0];
dev@322 87 return this.services.has(key) ? this.services.get(key)()[method](request) :
dev@322 88 Observable.throw('Invalid key');
dev@226 89 }
dev@226 90 }
dev@226 91
dev@250 92 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 93 constructor() {
dev@243 94 super();
dev@243 95 }
dev@243 96
dev@305 97 protected dispatch(method: 'process',
dev@248 98 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 99 let lastPercentagePoint = 0;
dev@305 100 let shouldClear = false;
dev@248 101 return super.dispatch(method, request)
dev@305 102 .scan((acc, value) => {
dev@305 103 if (shouldClear) {
dev@305 104 acc.features = [];
dev@305 105 }
dev@305 106 return streamingResponseReducer(acc, value);
dev@305 107 })
dev@243 108 .filter(val => {
dev@305 109 const progress = val.progress;
dev@243 110 const percentage =
dev@305 111 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 112 const pointDifference = (percentage - lastPercentagePoint);
dev@249 113 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 114 if (shouldEmit) {
dev@243 115 lastPercentagePoint = percentage;
dev@243 116 }
dev@305 117 shouldClear = shouldEmit;
dev@249 118 return shouldEmit;
dev@243 119 });
dev@243 120 }
dev@243 121 }
dev@243 122
dev@40 123 export default class FeatureExtractionWorker {
dev@226 124 private workerScope: DedicatedWorkerGlobalScope;
dev@226 125 private server: WebWorkerStreamingServer;
dev@226 126 private service: AggregateStreamingService;
dev@40 127
dev@226 128 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 129 private requireJs: RequireJs) {
dev@40 130 this.workerScope = workerScope;
dev@250 131 this.service = new ThrottledReducingAggregateService();
dev@226 132 this.setupImportLibraryListener();
dev@226 133 this.server = new WebWorkerStreamingServer(
dev@226 134 this.workerScope,
dev@226 135 this.service
dev@72 136 );
dev@226 137 }
dev@72 138
dev@226 139 private setupImportLibraryListener(): void {
dev@229 140
dev@44 141 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@44 142 switch (ev.data.method) {
dev@72 143 case 'addRemoteLibraries': // TODO rename
dev@72 144 const available: AvailableLibraries = ev.data.params;
dev@324 145 const importThunks = Object.keys(available).map(libraryKey => {
dev@324 146 return () => {
dev@328 147 return this.downloadRemoteLibrary(
dev@328 148 libraryKey,
dev@328 149 available[libraryKey]
dev@328 150 ).then(createService => {
dev@328 151 this.service.addService(libraryKey,
dev@328 152 () => new PiperStreamingService(
dev@328 153 createService()
dev@328 154 ));
dev@328 155 });
dev@324 156 };
dev@72 157 });
dev@328 158 waterfall(importThunks)
dev@328 159 .then(() => this.service.list({}))
dev@328 160 .then(response => {
dev@324 161 this.workerScope.postMessage({
dev@324 162 method: 'import',
dev@324 163 result: response
dev@324 164 });
dev@324 165 });
dev@44 166 }
dev@44 167 };
dev@40 168 }
dev@324 169
dev@328 170 private downloadRemoteLibrary(key: LibraryKey,
dev@328 171 uri: LibraryUri): Promise<Factory<Service>> {
dev@324 172 return new Promise((res, rej) => {
dev@328 173 this.requireJs([uri], (plugin) => {
dev@328 174 res(() => {
dev@328 175 // TODO a factory with more logic probably belongs in piper-js
dev@328 176 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@328 177 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@328 178 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
dev@324 179 });
dev@328 180 }, (err) => {
dev@328 181 rej(`Failed to load ${key} remote module.`);
dev@328 182 });
dev@324 183 });
dev@324 184 }
dev@40 185 }