annotate src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 248:4224929943bc

Small refactoring to avoid duplication of process and collect. It really seems like there is no need to have both methods.
author Lucas Thompson <dev@lucas.im>
date Thu, 27 Apr 2017 10:32:09 +0100
parents 7106cdd59e62
children 55be5d2e96f6
rev   line source
dev@40 1 /**
dev@40 2 * Created by lucas on 01/12/2016.
dev@40 3 */
dev@40 4
dev@88 5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
dev@72 6 import {
dev@226 7 SimpleRequest
dev@72 8 } from 'piper/HigherLevelUtilities';
dev@44 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
dev@243 10 import {
dev@243 11 AvailableLibraries
dev@243 12 } from './feature-extraction.service';
dev@226 13 import {
dev@226 14 DedicatedWorkerGlobalScope,
dev@226 15 WebWorkerStreamingServer
dev@236 16 } from 'piper/servers/WebWorkerStreamingServer';
dev@226 17 import {
dev@226 18 PiperStreamingService,
dev@226 19 StreamingResponse,
dev@226 20 StreamingService
dev@236 21 } from 'piper/StreamingService';
dev@236 22 import {Observable} from 'rxjs/Observable';
dev@236 23 import {EmscriptenModule} from 'piper/PiperVampService';
dev@243 24 import {streamingResponseReducer} from './FeatureReducers';
dev@40 25
dev@40 26 interface MessageEvent {
dev@40 27 readonly data: any;
dev@40 28 }
dev@40 29
dev@72 30 type LibraryUri = string;
dev@72 31 type LibraryKey = string;
dev@72 32
dev@72 33 type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;
dev@72 34
dev@226 35 class AggregateStreamingService implements StreamingService {
dev@226 36 private services: Map<LibraryKey, PiperStreamingService>;
dev@226 37
dev@226 38 constructor() {
dev@226 39 this.services = new Map<LibraryKey, PiperStreamingService>();
dev@226 40 this.services.set(
dev@226 41 'vamp-example-plugins',
dev@226 42 new PiperStreamingService(new PiperVampService(VampExamplePlugins()))
dev@226 43 );
dev@226 44 }
dev@226 45
dev@226 46 addService(key: LibraryKey, service: PiperStreamingService): void {
dev@226 47 this.services.set(key, service);
dev@226 48 }
dev@226 49
dev@226 50 list(request: ListRequest): Promise<ListResponse> {
dev@226 51 return Promise.all(
dev@226 52 [...this.services.values()].map(client => client.list({}))
dev@226 53 ).then(allAvailable => ({
dev@226 54 available: allAvailable.reduce(
dev@226 55 (all, current) => all.concat(current.available),
dev@226 56 []
dev@226 57 )
dev@226 58 })
dev@226 59 );
dev@226 60 }
dev@226 61
dev@226 62 process(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 63 return this.dispatch('process', request);
dev@226 64 }
dev@226 65
dev@226 66 collect(request: SimpleRequest): Observable<StreamingResponse> {
dev@248 67 return this.dispatch('collect', request);
dev@248 68 }
dev@248 69
dev@248 70 protected dispatch(method: 'process' | 'collect',
dev@248 71 request: SimpleRequest): Observable<StreamingResponse> {
dev@226 72 const key = request.key.split(':')[0];
dev@226 73 return this.services.has(key) ?
dev@248 74 this.services.get(key)[method](request) : Observable.throw('Invalid key');
dev@226 75 }
dev@226 76 }
dev@226 77
dev@243 78 class ReducingAggregateService extends AggregateStreamingService {
dev@243 79 constructor() {
dev@243 80 super();
dev@243 81 }
dev@243 82
dev@248 83 protected dispatch(method: 'process' | 'collect',
dev@248 84 request: SimpleRequest): Observable<StreamingResponse> {
dev@243 85 let lastPercentagePoint = 0;
dev@248 86 return super.dispatch(method, request)
dev@243 87 .scan(streamingResponseReducer)
dev@243 88 .filter(val => {
dev@243 89 const percentage =
dev@243 90 100 * (val.processedBlockCount / val.totalBlockCount) | 0;
dev@243 91 const pointDifference = (percentage - lastPercentagePoint);
dev@243 92 if (pointDifference === 1 || percentage === 100) {
dev@243 93 lastPercentagePoint = percentage;
dev@243 94 return true;
dev@243 95 } else {
dev@243 96 return false;
dev@243 97 }
dev@243 98 });
dev@243 99 }
dev@243 100 }
dev@243 101
dev@40 102 export default class FeatureExtractionWorker {
dev@226 103 private workerScope: DedicatedWorkerGlobalScope;
dev@72 104 private remoteLibraries: Map<LibraryKey, LibraryUri>;
dev@226 105 private server: WebWorkerStreamingServer;
dev@226 106 private service: AggregateStreamingService;
dev@40 107
dev@226 108 constructor(workerScope: DedicatedWorkerGlobalScope,
dev@226 109 private requireJs: RequireJs) {
dev@40 110 this.workerScope = workerScope;
dev@72 111 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
dev@243 112 this.service = new ReducingAggregateService();
dev@226 113 this.setupImportLibraryListener();
dev@226 114 this.server = new WebWorkerStreamingServer(
dev@226 115 this.workerScope,
dev@226 116 this.service
dev@72 117 );
dev@226 118 }
dev@72 119
dev@226 120 private setupImportLibraryListener(): void {
dev@229 121
dev@44 122 this.workerScope.onmessage = (ev: MessageEvent) => {
dev@64 123 const sendResponse = (result) => {
dev@64 124 this.workerScope.postMessage({
dev@64 125 method: ev.data.method,
dev@64 126 result: result
dev@64 127 });
dev@64 128 };
dev@44 129 switch (ev.data.method) {
dev@72 130 case 'import':
dev@72 131 const key: LibraryKey = ev.data.params;
dev@72 132 if (this.remoteLibraries.has(key)) {
dev@72 133 this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
dev@229 134 // TODO a factory with more logic probably belongs in piper-js
dev@229 135 const lib: any | EmscriptenModule = plugin.createLibrary();
dev@229 136 const isEmscriptenModule = typeof lib.cwrap === 'function';
dev@229 137 const service = new PiperStreamingService(
dev@229 138 isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
dev@229 139 );
dev@229 140 this.service.addService(key, service);
dev@226 141 this.service.list({}).then(sendResponse);
dev@72 142 });
dev@72 143 } else {
dev@72 144 console.error('Non registered library key.'); // TODO handle error
dev@72 145 }
dev@72 146 break;
dev@72 147 case 'addRemoteLibraries': // TODO rename
dev@72 148 const available: AvailableLibraries = ev.data.params;
dev@236 149 Object.keys(available).forEach(libraryKey => {
dev@236 150 this.remoteLibraries.set(libraryKey, available[libraryKey]);
dev@72 151 });
dev@44 152 }
dev@44 153 };
dev@40 154 }
dev@40 155 }