annotate src/app/services/feature-extraction/feature-extraction.service.ts @ 226:4865567d9e43

Refactor feature extraction service to use piper streaming client/server. Change FeatureExtractionWorker accordingly and calling code.
author Lucas Thompson <dev@lucas.im>
date Fri, 21 Apr 2017 12:59:41 +0100
parents 2c3fe51ad1f0
children 53ea6406d601
rev   line source
dev@74 1 import {Injectable, Inject} from '@angular/core';
dev@47 2 import {
dev@226 3 ListResponse
dev@47 4 } from "piper";
dev@64 5 import {
dev@226 6 SimpleRequest,
dev@226 7 SimpleResponse
dev@64 8 } from "piper/HigherLevelUtilities";
dev@51 9 import {Subject} from "rxjs/Subject";
dev@51 10 import {Observable} from "rxjs";
dev@74 11 import {Http, Response} from "@angular/http";
dev@226 12 import {
dev@226 13 countingIdProvider,
dev@226 14 WebWorkerStreamingClient
dev@226 15 } from "piper/client-stubs/WebWorkerStreamingClient";
dev@226 16 import {RequestId} from "piper/protocols/WebWorkerProtocol";
dev@44 17
dev@47 18 interface RequestMessage<RequestType> {
dev@47 19 method: string;
dev@47 20 params: RequestType;
dev@47 21 }
dev@47 22
dev@47 23 interface ResponseMessage<ResponseType> {
dev@47 24 method: string;
dev@47 25 result: ResponseType;
dev@47 26 }
dev@40 27
dev@74 28 type RepoUri = string;
dev@74 29 export interface AvailableLibraries {
dev@74 30 [libraryKey: string]: RepoUri;
dev@74 31 }
dev@74 32
dev@226 33 export interface Progress {
dev@226 34 id: RequestId;
dev@226 35 value: number; // between 0 and 100, for material-ui
dev@226 36 }
dev@226 37
dev@40 38 @Injectable()
dev@40 39 export class FeatureExtractionService {
dev@40 40
dev@40 41 private worker: Worker;
dev@64 42 private featuresExtracted: Subject<SimpleResponse>;
dev@64 43 featuresExtracted$: Observable<SimpleResponse>;
dev@74 44 private librariesUpdated: Subject<ListResponse>;
dev@74 45 librariesUpdated$: Observable<ListResponse>;
dev@226 46 private progressUpdated: Subject<Progress>;
dev@226 47 progressUpdated$: Observable<Progress>;
dev@226 48 private client: WebWorkerStreamingClient;
dev@44 49
dev@74 50 constructor(private http: Http, @Inject('PiperRepoUri') private repositoryUri: RepoUri) {
dev@40 51 this.worker = new Worker('bootstrap-feature-extraction-worker.js');
dev@64 52 this.featuresExtracted = new Subject<SimpleResponse>();
dev@51 53 this.featuresExtracted$ = this.featuresExtracted.asObservable();
dev@74 54 this.librariesUpdated = new Subject<ListResponse>();
dev@74 55 this.librariesUpdated$ = this.librariesUpdated.asObservable();
dev@226 56 this.progressUpdated = new Subject<Progress>();
dev@226 57 this.progressUpdated$ = this.progressUpdated.asObservable();
dev@74 58 this.worker.addEventListener('message', (ev: MessageEvent) => {
dev@74 59 const isValidResponse = ev.data.method === 'import'
dev@226 60 && ev.data.result && ev.data.result.available ;
dev@74 61 if (isValidResponse) {
dev@226 62 (ev as Event).stopImmediatePropagation();
dev@74 63 this.librariesUpdated.next(ev.data.result);
dev@74 64 }
dev@226 65 }, true);
dev@226 66
dev@226 67 this.client = new WebWorkerStreamingClient(
dev@226 68 this.worker,
dev@226 69 countingIdProvider(0)
dev@226 70 )
dev@40 71 }
dev@40 72
dev@47 73 list(): Promise<ListResponse> {
dev@226 74 return this.client.list({});
dev@40 75 }
dev@40 76
dev@226 77 extract(analysisItemId: string, request: SimpleRequest): Promise<void> {
dev@226 78 const arrayReducer = (acc, val) => {
dev@226 79 acc.push.apply(acc, val);
dev@226 80 return acc;
dev@226 81 };
dev@226 82 const typedArrayReducer = (acc: Float32Array,
dev@226 83 val: Float32Array): Float32Array => {
dev@226 84 return Float32Array.of(...acc, ...val);
dev@226 85 };
dev@226 86 return this.client.collect(request)
dev@226 87 .do(val => {
dev@226 88 this.progressUpdated.next({
dev@226 89 id: analysisItemId,
dev@226 90 value: (val.processedBlockCount / val.totalBlockCount) * 100
dev@226 91 });
dev@226 92 })
dev@226 93 .reduce((acc, val) => {
dev@226 94 if (acc.features.data instanceof Array &&
dev@226 95 val.features.data instanceof Array) {
dev@226 96 acc.features.data = arrayReducer(
dev@226 97 acc.features.data,
dev@226 98 val.features.data
dev@226 99 );
dev@226 100 } else if (acc.features.data instanceof Float32Array &&
dev@226 101 val.features.data instanceof Float32Array) {
dev@226 102 acc.features.data = typedArrayReducer(
dev@226 103 acc.features.data,
dev@226 104 val.features.data
dev@226 105 );
dev@226 106 } else {
dev@226 107 throw "Invalid feature output. Aborting";
dev@226 108 }
dev@226 109 return acc;
dev@226 110 })
dev@226 111 .toPromise()
dev@226 112 .then((response) => {
dev@226 113 this.featuresExtracted.next(response);
dev@226 114 });
dev@62 115 }
dev@62 116
dev@74 117 updateAvailableLibraries(): Observable<AvailableLibraries> {
dev@74 118 return this.http.get(this.repositoryUri)
dev@74 119 .map(res => {
dev@74 120 const map = res.json();
dev@74 121 this.worker.postMessage({
dev@74 122 method: 'addRemoteLibraries',
dev@74 123 params: map
dev@74 124 });
dev@74 125 return map;
dev@74 126 })
dev@74 127 .catch((error: Response | any) => {
dev@74 128 console.error(error);
dev@74 129 return Observable.throw(error);
dev@74 130 });
dev@74 131 }
dev@74 132
dev@74 133 load(libraryKey: string): void {
dev@74 134 this.worker.postMessage({method: 'import', params: libraryKey});
dev@74 135 }
dev@40 136 }