Mercurial > hg > ugly-duckling
view src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 449:dc7237d84f8d
When a remote extractor fails, still populate menu with ones that were valid.
author | Lucas Thompson <dev@lucas.im> |
---|---|
date | Wed, 28 Jun 2017 10:40:36 +0100 |
parents | 676c4d6d35f7 |
children | c39df81c4dae |
line wrap: on
line source
/** * Created by lucas on 01/12/2016. */ import {PiperVampService, ListRequest, ListResponse, Service} from 'piper'; import { SimpleRequest } from 'piper/HigherLevelUtilities'; import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; import { AvailableLibraries } from './feature-extraction.service'; import { DedicatedWorkerGlobalScope, WebWorkerStreamingServer } from 'piper/servers/WebWorkerStreamingServer'; import { PiperStreamingService, StreamingResponse, StreamingService } from 'piper/StreamingService'; import {Observable} from 'rxjs/Observable'; import {EmscriptenModule} from 'piper/PiperVampService'; import {streamingResponseReducer} from './FeatureReducers'; interface MessageEvent { readonly data: any; } type LibraryUri = string; type LibraryKey = string; type RequireJs = (libs: string[], callback: (...libs: any[]) => void, errBack: (...failedLibIds: string[]) => void) => void; type Factory<T> = () => T; function waterfall<T>(tasks: (() => Promise<T>)[]): Promise<T[]> { const reducer = (running: T[], next: Promise<T>): Promise<T[]> => { return next.then(response => { running = running.concat(response); return running; }); }; return tasks.reduce((runningResponses, nextResponse) => { return runningResponses.then(response => { try { return reducer(response, nextResponse()); } catch (e) { throw new QueuedTaskFailure(runningResponses); } }); }, Promise.resolve([])); } class QueuedTaskFailure<T> extends Error { public previousResponses: Promise<T[]>; constructor(previousResponses: Promise<T[]>, message?: string) { super(message || 'Queued task failed.'); this.previousResponses = previousResponses; } } function flattenListResponses(responses: ListResponse[]): ListResponse { return { available: responses.reduce( (flat, res) => flat.concat(res.available), [] ) }; } class AggregateStreamingService implements StreamingService { private services: Map<LibraryKey, Factory<PiperStreamingService>>; constructor() { this.services = new Map<LibraryKey, Factory<PiperStreamingService>>(); this.services.set( 'vamp-example-plugins', () => new PiperStreamingService( new PiperVampService(VampExamplePlugins()) ) ); } addService(key: LibraryKey, service: Factory<PiperStreamingService>): void { this.services.set(key, service); } list(request: ListRequest): Promise<ListResponse> { const listThunks: (() => Promise<ListResponse>)[] = [ ...this.services.values() ].map(createClient => () => createClient().list({})); return waterfall(listThunks) .then(flattenListResponses); } process(request: SimpleRequest): Observable<StreamingResponse> { return this.dispatch('process', request); } protected dispatch(method: 'process', request: SimpleRequest): Observable<StreamingResponse> { const key = request.key.split(':')[0]; return this.services.has(key) ? this.services.get(key)()[method](request) : Observable.throw('Invalid key'); } } class ThrottledReducingAggregateService extends AggregateStreamingService { constructor() { super(); } protected dispatch(method: 'process', request: SimpleRequest): Observable<StreamingResponse> { let lastPercentagePoint = 0; let shouldClear = false; return super.dispatch(method, request) .scan((acc, value) => { if (shouldClear) { acc.features = []; } return streamingResponseReducer(acc, value); }) .filter(val => { const progress = val.progress; const percentage = 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0; const pointDifference = (percentage - lastPercentagePoint); const shouldEmit = pointDifference === 1 || percentage === 100; if (shouldEmit) { lastPercentagePoint = percentage; } shouldClear = shouldEmit; return shouldEmit; }); } } export default class FeatureExtractionWorker { private workerScope: DedicatedWorkerGlobalScope; private server: WebWorkerStreamingServer; private service: AggregateStreamingService; constructor(workerScope: DedicatedWorkerGlobalScope, private requireJs: RequireJs) { this.workerScope = workerScope; this.service = new ThrottledReducingAggregateService(); this.setupImportLibraryListener(); this.server = new WebWorkerStreamingServer( this.workerScope, this.service ); } private setupImportLibraryListener(): void { this.workerScope.onmessage = (ev: MessageEvent) => { switch (ev.data.method) { case 'addRemoteLibraries': // TODO rename const available: AvailableLibraries = ev.data.params; const importThunks = Object.keys(available).map(libraryKey => { return () => { return this.downloadRemoteLibrary( libraryKey, available[libraryKey] ).then(createService => { this.service.addService(libraryKey, () => new PiperStreamingService( createService() )); }); }; }); waterfall(importThunks) .then(() => this.service.list({})) .then(response => { this.workerScope.postMessage({ method: 'import', result: response }); }) .catch((e) => { console.warn(`${e.message}. Try using results so far`); e.previousResponses.then(responses => { this.workerScope.postMessage({ method: 'import', result: flattenListResponses(responses) }); }); }); } }; } private downloadRemoteLibrary(key: LibraryKey, uri: LibraryUri): Promise<Factory<Service>> { return new Promise((res, rej) => { this.requireJs([uri], (createModule) => { res(() => { // TODO a factory with more logic probably belongs in piper-js const lib: any | EmscriptenModule = createModule(); const isEmscriptenModule = typeof lib.cwrap === 'function'; return isEmscriptenModule ? new PiperVampService(lib) : lib; // TODO }); }, (err) => { rej(`Failed to load ${key} remote module.`); }); }); } }