annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 484:ae96db60f25c

Move extraction toggle button back to right edge.
author Lucas Thompson <dev@lucas.im>
date Mon, 03 Jul 2017 20:32:51 +0100
parents dc7237d84f8d
children c39df81c4dae
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@328 5 import {PiperVampService, ListRequest, ListResponse, Service} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@328 33 type RequireJs = (libs: string[],
dev@328 34 callback: (...libs: any[]) => void,
dev@328 35 errBack: (...failedLibIds: string[]) => void) => void;
dev@322 36 type Factory<T> = () => T;
dev@72 37
dev@324 38 function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> {
dev@324 39 const reducer = (running: T[], next: Promise<T>): Promise<T[]> => {
dev@324 40 return next.then(response => {
dev@324 41 running = running.concat(response);
dev@324 42 return running;
dev@324 43 });
dev@324 44 };
dev@324 45
dev@324 46 return tasks.reduce((runningResponses, nextResponse) => {
dev@324 47 return runningResponses.then(response => {
dev@449 48 try {
dev@449 49 return reducer(response, nextResponse());
dev@449 50 } catch (e) {
dev@449 51 throw new QueuedTaskFailure(runningResponses);
dev@449 52 }
dev@328 53 });
dev@324 54 }, Promise.resolve([]));
dev@324 55 }
dev@324 56
dev@449 57 class QueuedTaskFailure<T> extends Error {
dev@449 58 public previousResponses: Promise<T[]>;
dev@449 59
dev@449 60 constructor(previousResponses: Promise<T[]>, message?: string) {
dev@449 61 super(message || 'Queued task failed.');
dev@449 62 this.previousResponses = previousResponses;
dev@449 63 }
dev@449 64 }
dev@449 65
dev@449 66 function flattenListResponses(responses: ListResponse[]): ListResponse {
dev@449 67 return {
dev@449 68 available: responses.reduce(
dev@449 69 (flat, res) => flat.concat(res.available),
dev@449 70 []
dev@449 71 )
dev@449 72 };
dev@449 73 }
dev@449 74
dev@226 75 class AggregateStreamingService implements StreamingService {
dev@322 76 private services: Map<LibraryKey, Factory<PiperStreamingService>>;
dev@226 77
dev@226 78 constructor() {
dev@322 79 this.services = new Map<LibraryKey, Factory<PiperStreamingService>>();
dev@322 80 this.services.set(
dev@322 81 'vamp-example-plugins',
dev@322 82 () => new PiperStreamingService(
dev@322 83 new PiperVampService(VampExamplePlugins())
dev@322 84 )
dev@322 85 );
dev@226 86 }
dev@226 87
dev@322 88 addService(key: LibraryKey, service: Factory<PiperStreamingService>): void {
dev@226 89 this.services.set(key, service);
dev@226 90 }
dev@226 91
dev@226 92 list(request: ListRequest): Promise<ListResponse> {
dev@323 93 const listThunks: (() => Promise<ListResponse>)[] = [
dev@323 94 ...this.services.values()
dev@328 95 ].map(createClient => () => createClient().list({}));
dev@449 96 return waterfall(listThunks)
dev@449 97 .then(flattenListResponses);
dev@226 98 }
dev@226 99
dev@226 100 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 101 return this.dispatch('process', request);
dev@226 102 }
dev@226 103
dev@305 104 protected dispatch(method: 'process',
dev@248 105 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 106 const key = request.key.split(':')[0];
dev@322 107 return this.services.has(key) ? this.services.get(key)()[method](request) :
dev@322 108 Observable.throw('Invalid key');
dev@226 109 }
dev@226 110 }
dev@226 111
dev@250 112 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 113 constructor() {
dev@243 114 super();
dev@243 115 }
dev@243 116
dev@305 117 protected dispatch(method: 'process',
dev@248 118 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 119 let lastPercentagePoint = 0;
dev@305 120 let shouldClear = false;
dev@248 121 return super.dispatch(method, request)
dev@305 122 .scan((acc, value) => {
dev@305 123 if (shouldClear) {
dev@305 124 acc.features = [];
dev@305 125 }
dev@305 126 return streamingResponseReducer(acc, value);
dev@305 127 })
dev@243 128 .filter(val => {
dev@305 129 const progress = val.progress;
dev@243 130 const percentage =
dev@305 131 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 132 const pointDifference = (percentage - lastPercentagePoint);
dev@249 133 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 134 if (shouldEmit) {
dev@243 135 lastPercentagePoint = percentage;
dev@243 136 }
dev@305 137 shouldClear = shouldEmit;
dev@249 138 return shouldEmit;
dev@243 139 });
dev@243 140 }
dev@243 141 }
dev@243 142
dev@40 143 export default class FeatureExtractionWorker {
dev@226 144 private workerScope: DedicatedWorkerGlobalScope;
dev@226 145 private server: WebWorkerStreamingServer;
dev@226 146 private service: AggregateStreamingService;
dev@40 147
dev@226 148 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 149 private requireJs: RequireJs) {
dev@40 150 this.workerScope = workerScope;
dev@250 151 this.service = new ThrottledReducingAggregateService();
dev@226 152 this.setupImportLibraryListener();
dev@226 153 this.server = new WebWorkerStreamingServer(
dev@226 154 this.workerScope,
dev@226 155 this.service
dev@72 156 );
dev@226 157 }
dev@72 158
dev@226 159 private setupImportLibraryListener(): void {
dev@229 160
dev@44 161 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@44 162 switch (ev.data.method) {
dev@72 163 case 'addRemoteLibraries': // TODO rename
dev@72 164 const available: AvailableLibraries = ev.data.params;
dev@324 165 const importThunks = Object.keys(available).map(libraryKey => {
dev@324 166 return () => {
dev@328 167 return this.downloadRemoteLibrary(
dev@328 168 libraryKey,
dev@328 169 available[libraryKey]
dev@328 170 ).then(createService => {
dev@328 171 this.service.addService(libraryKey,
dev@328 172 () => new PiperStreamingService(
dev@328 173 createService()
dev@328 174 ));
dev@328 175 });
dev@324 176 };
dev@72 177 });
dev@328 178 waterfall(importThunks)
dev@328 179 .then(() => this.service.list({}))
dev@328 180 .then(response => {
dev@324 181 this.workerScope.postMessage({
dev@324 182 method: 'import',
dev@324 183 result: response
dev@324 184 });
dev@449 185 })
dev@449 186 .catch((e) => {
dev@449 187 console.warn(`${e.message}. Try using results so far`);
dev@449 188 e.previousResponses.then(responses => {
dev@449 189 this.workerScope.postMessage({
dev@449 190 method: 'import',
dev@449 191 result: flattenListResponses(responses)
dev@449 192 });
dev@449 193 });
dev@324 194 });
dev@44 195 }
dev@44 196 };
dev@40 197 }
dev@324 198
dev@328 199 private downloadRemoteLibrary(key: LibraryKey,
dev@328 200 uri: LibraryUri): Promise<Factory<Service>> {
dev@324 201 return new Promise((res, rej) => {
dev@449 202 this.requireJs([uri], (createModule) => {
dev@328 203 res(() => {
dev@328 204 // TODO a factory with more logic probably belongs in piper-js
dev@449 205 const lib: any | EmscriptenModule = createModule();
dev@328 206 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@328 207 return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO
dev@324 208 });
dev@328 209 }, (err) => {
dev@328 210 rej(`Failed to load ${key} remote module.`);
dev@328 211 });
dev@324 212 });
dev@324 213 }
dev@40 214 }