annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 305:75a234459d3b

Fix for changes to streaming api in piper-js i.e. collect on the client
author Lucas Thompson <dev@lucas.im>
date Fri, 12 May 2017 08:28:18 +0100
parents c60b03098bae
children a16d968d646e
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@88 5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@72 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
dev@72 34
dev@226 35 class AggregateStreamingService implements StreamingService {
dev@226 36 private services: Map<LibraryKey, PiperStreamingService>;
dev@226 37
dev@226 38 constructor() {
dev@226 39 this.services = new Map<LibraryKey, PiperStreamingService>();
dev@226 40 this.services.set(
dev@226 41 'vamp-example-plugins',
dev@226 42 new PiperStreamingService(new PiperVampService(VampExamplePlugins()))
dev@226 43 );
dev@226 44 }
dev@226 45
dev@226 46 addService(key: LibraryKey, service: PiperStreamingService): void {
dev@226 47 this.services.set(key, service);
dev@226 48 }
dev@226 49
dev@226 50 list(request: ListRequest): Promise<ListResponse> {
dev@226 51 return Promise.all(
dev@226 52 [...this.services.values()].map(client => client.list({}))
dev@226 53 ).then(allAvailable => ({
dev@226 54 available: allAvailable.reduce(
dev@226 55 (all, current) => all.concat(current.available),
dev@226 56 []
dev@226 57 )
dev@226 58 })
dev@226 59 );
dev@226 60 }
dev@226 61
dev@226 62 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 63 return this.dispatch('process', request);
dev@226 64 }
dev@226 65
dev@305 66 protected dispatch(method: 'process',
dev@248 67 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 68 const key = request.key.split(':')[0];
dev@226 69 return this.services.has(key) ?
dev@248 70 this.services.get(key)[method](request) : Observable.throw('Invalid key');
dev@226 71 }
dev@226 72 }
dev@226 73
dev@250 74 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 75 constructor() {
dev@243 76 super();
dev@243 77 }
dev@243 78
dev@305 79 protected dispatch(method: 'process',
dev@248 80 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 81 let lastPercentagePoint = 0;
dev@305 82 let shouldClear = false;
dev@248 83 return super.dispatch(method, request)
dev@305 84 .scan((acc, value) => {
dev@305 85 if (shouldClear) {
dev@305 86 acc.features = [];
dev@305 87 }
dev@305 88 return streamingResponseReducer(acc, value);
dev@305 89 })
dev@243 90 .filter(val => {
dev@305 91 const progress = val.progress;
dev@243 92 const percentage =
dev@305 93 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
dev@243 94 const pointDifference = (percentage - lastPercentagePoint);
dev@249 95 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 96 if (shouldEmit) {
dev@243 97 lastPercentagePoint = percentage;
dev@243 98 }
dev@305 99 shouldClear = shouldEmit;
dev@249 100 return shouldEmit;
dev@243 101 });
dev@243 102 }
dev@243 103 }
dev@243 104
dev@40 105 export default class FeatureExtractionWorker {
dev@226 106 private workerScope: DedicatedWorkerGlobalScope;
dev@72 107 private remoteLibraries: Map<LibraryKey, LibraryUri>;
dev@226 108 private server: WebWorkerStreamingServer;
dev@226 109 private service: AggregateStreamingService;
dev@40 110
dev@226 111 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 112 private requireJs: RequireJs) {
dev@40 113 this.workerScope = workerScope;
dev@72 114 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
dev@250 115 this.service = new ThrottledReducingAggregateService();
dev@226 116 this.setupImportLibraryListener();
dev@226 117 this.server = new WebWorkerStreamingServer(
dev@226 118 this.workerScope,
dev@226 119 this.service
dev@72 120 );
dev@226 121 }
dev@72 122
dev@226 123 private setupImportLibraryListener(): void {
dev@229 124
dev@44 125 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@64 126 const sendResponse = (result) => {
dev@64 127 this.workerScope.postMessage({
dev@64 128 method: ev.data.method,
dev@64 129 result: result
dev@64 130 });
dev@64 131 };
dev@44 132 switch (ev.data.method) {
dev@72 133 case 'import':
dev@72 134 const key: LibraryKey = ev.data.params;
dev@72 135 if (this.remoteLibraries.has(key)) {
dev@72 136 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
dev@229 137 // TODO a factory with more logic probably belongs in piper-js
dev@229 138 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@229 139 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@229 140 const service = new PiperStreamingService(
dev@229 141 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
dev@229 142 );
dev@229 143 this.service.addService(key, service);
dev@226 144 this.service.list({}).then(sendResponse);
dev@72 145 });
dev@72 146 } else {
dev@72 147 console.error('Non registered library key.'); // TODO handle error
dev@72 148 }
dev@72 149 break;
dev@72 150 case 'addRemoteLibraries': // TODO rename
dev@72 151 const available: AvailableLibraries = ev.data.params;
dev@236 152 Object.keys(available).forEach(libraryKey => {
dev@236 153 this.remoteLibraries.set(libraryKey, available[libraryKey]);
dev@72 154 });
dev@44 155 }
dev@44 156 };
dev@40 157 }
dev@40 158 }