annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 297:73beb0e970c5

Modify FeatureReducers so that it compiles.
author Chris Cannam <cannam@all-day-breakfast.com>
date Wed, 10 May 2017 14:47:26 +0100
parents c60b03098bae
children 75a234459d3b
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@88 5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@72 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
dev@72 34
dev@226 35 class AggregateStreamingService implements StreamingService {
dev@226 36 private services: Map<LibraryKey, PiperStreamingService>;
dev@226 37
dev@226 38 constructor() {
dev@226 39 this.services = new Map<LibraryKey, PiperStreamingService>();
dev@226 40 this.services.set(
dev@226 41 'vamp-example-plugins',
dev@226 42 new PiperStreamingService(new PiperVampService(VampExamplePlugins()))
dev@226 43 );
dev@226 44 }
dev@226 45
dev@226 46 addService(key: LibraryKey, service: PiperStreamingService): void {
dev@226 47 this.services.set(key, service);
dev@226 48 }
dev@226 49
dev@226 50 list(request: ListRequest): Promise<ListResponse> {
dev@226 51 return Promise.all(
dev@226 52 [...this.services.values()].map(client => client.list({}))
dev@226 53 ).then(allAvailable => ({
dev@226 54 available: allAvailable.reduce(
dev@226 55 (all, current) => all.concat(current.available),
dev@226 56 []
dev@226 57 )
dev@226 58 })
dev@226 59 );
dev@226 60 }
dev@226 61
dev@226 62 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 63 return this.dispatch('process', request);
dev@226 64 }
dev@226 65
dev@226 66 collect(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 67 return this.dispatch('collect', request);
dev@248 68 }
dev@248 69
dev@248 70 protected dispatch(method: 'process' | 'collect',
dev@248 71 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 72 const key = request.key.split(':')[0];
dev@226 73 return this.services.has(key) ?
dev@248 74 this.services.get(key)[method](request) : Observable.throw('Invalid key');
dev@226 75 }
dev@226 76 }
dev@226 77
dev@250 78 class ThrottledReducingAggregateService extends AggregateStreamingService {
dev@243 79 constructor() {
dev@243 80 super();
dev@243 81 }
dev@243 82
dev@248 83 protected dispatch(method: 'process' | 'collect',
dev@248 84 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 85 let lastPercentagePoint = 0;
dev@248 86 return super.dispatch(method, request)
dev@243 87 .scan(streamingResponseReducer)
dev@243 88 .filter(val => {
dev@243 89 const percentage =
dev@243 90 100 * (val.processedBlockCount / val.totalBlockCount) | 0;
dev@243 91 const pointDifference = (percentage - lastPercentagePoint);
dev@249 92 const shouldEmit = pointDifference === 1 || percentage === 100;
dev@249 93 if (shouldEmit) {
dev@243 94 lastPercentagePoint = percentage;
dev@243 95 }
dev@249 96 return shouldEmit;
dev@243 97 });
dev@243 98 }
dev@243 99 }
dev@243 100
dev@40 101 export default class FeatureExtractionWorker {
dev@226 102 private workerScope: DedicatedWorkerGlobalScope;
dev@72 103 private remoteLibraries: Map<LibraryKey, LibraryUri>;
dev@226 104 private server: WebWorkerStreamingServer;
dev@226 105 private service: AggregateStreamingService;
dev@40 106
dev@226 107 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 108 private requireJs: RequireJs) {
dev@40 109 this.workerScope = workerScope;
dev@72 110 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
dev@250 111 this.service = new ThrottledReducingAggregateService();
dev@226 112 this.setupImportLibraryListener();
dev@226 113 this.server = new WebWorkerStreamingServer(
dev@226 114 this.workerScope,
dev@226 115 this.service
dev@72 116 );
dev@226 117 }
dev@72 118
dev@226 119 private setupImportLibraryListener(): void {
dev@229 120
dev@44 121 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@64 122 const sendResponse = (result) => {
dev@64 123 this.workerScope.postMessage({
dev@64 124 method: ev.data.method,
dev@64 125 result: result
dev@64 126 });
dev@64 127 };
dev@44 128 switch (ev.data.method) {
dev@72 129 case 'import':
dev@72 130 const key: LibraryKey = ev.data.params;
dev@72 131 if (this.remoteLibraries.has(key)) {
dev@72 132 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
dev@229 133 // TODO a factory with more logic probably belongs in piper-js
dev@229 134 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@229 135 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@229 136 const service = new PiperStreamingService(
dev@229 137 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
dev@229 138 );
dev@229 139 this.service.addService(key, service);
dev@226 140 this.service.list({}).then(sendResponse);
dev@72 141 });
dev@72 142 } else {
dev@72 143 console.error('Non registered library key.'); // TODO handle error
dev@72 144 }
dev@72 145 break;
dev@72 146 case 'addRemoteLibraries': // TODO rename
dev@72 147 const available: AvailableLibraries = ev.data.params;
dev@236 148 Object.keys(available).forEach(libraryKey => {
dev@236 149 this.remoteLibraries.set(libraryKey, available[libraryKey]);
dev@72 150 });
dev@44 151 }
dev@44 152 };
dev@40 153 }
dev@40 154 }