Mercurial > hg > ugly-duckling
view src/app/services/feature-extraction/feature-extraction.service.ts @ 226:4865567d9e43
Refactor feature extraction service to use piper streaming client/server.
Change FeatureExtractionWorker accordingly and calling code.
author | Lucas Thompson <dev@lucas.im> |
---|---|
date | Fri, 21 Apr 2017 12:59:41 +0100 |
parents | 2c3fe51ad1f0 |
children | 53ea6406d601 |
line wrap: on
line source
import {Injectable, Inject} from '@angular/core'; import { ListResponse } from "piper"; import { SimpleRequest, SimpleResponse } from "piper/HigherLevelUtilities"; import {Subject} from "rxjs/Subject"; import {Observable} from "rxjs"; import {Http, Response} from "@angular/http"; import { countingIdProvider, WebWorkerStreamingClient } from "piper/client-stubs/WebWorkerStreamingClient"; import {RequestId} from "piper/protocols/WebWorkerProtocol"; interface RequestMessage<RequestType> { method: string; params: RequestType; } interface ResponseMessage<ResponseType> { method: string; result: ResponseType; } type RepoUri = string; export interface AvailableLibraries { [libraryKey: string]: RepoUri; } export interface Progress { id: RequestId; value: number; // between 0 and 100, for material-ui } @Injectable() export class FeatureExtractionService { private worker: Worker; private featuresExtracted: Subject<SimpleResponse>; featuresExtracted$: Observable<SimpleResponse>; private librariesUpdated: Subject<ListResponse>; librariesUpdated$: Observable<ListResponse>; private progressUpdated: Subject<Progress>; progressUpdated$: Observable<Progress>; private client: WebWorkerStreamingClient; constructor(private http: Http, @Inject('PiperRepoUri') private repositoryUri: RepoUri) { this.worker = new Worker('bootstrap-feature-extraction-worker.js'); this.featuresExtracted = new Subject<SimpleResponse>(); this.featuresExtracted$ = this.featuresExtracted.asObservable(); this.librariesUpdated = new Subject<ListResponse>(); this.librariesUpdated$ = this.librariesUpdated.asObservable(); this.progressUpdated = new Subject<Progress>(); this.progressUpdated$ = this.progressUpdated.asObservable(); this.worker.addEventListener('message', (ev: MessageEvent) => { const isValidResponse = ev.data.method === 'import' && ev.data.result && ev.data.result.available ; if (isValidResponse) { (ev as Event).stopImmediatePropagation(); this.librariesUpdated.next(ev.data.result); } }, true); this.client = new WebWorkerStreamingClient( this.worker, countingIdProvider(0) ) } list(): Promise<ListResponse> { return this.client.list({}); } extract(analysisItemId: string, request: SimpleRequest): Promise<void> { const arrayReducer = (acc, val) => { acc.push.apply(acc, val); return acc; }; const typedArrayReducer = (acc: Float32Array, val: Float32Array): Float32Array => { return Float32Array.of(...acc, ...val); }; return this.client.collect(request) .do(val => { this.progressUpdated.next({ id: analysisItemId, value: (val.processedBlockCount / val.totalBlockCount) * 100 }); }) .reduce((acc, val) => { if (acc.features.data instanceof Array && val.features.data instanceof Array) { acc.features.data = arrayReducer( acc.features.data, val.features.data ); } else if (acc.features.data instanceof Float32Array && val.features.data instanceof Float32Array) { acc.features.data = typedArrayReducer( acc.features.data, val.features.data ); } else { throw "Invalid feature output. Aborting"; } return acc; }) .toPromise() .then((response) => { this.featuresExtracted.next(response); }); } updateAvailableLibraries(): Observable<AvailableLibraries> { return this.http.get(this.repositoryUri) .map(res => { const map = res.json(); this.worker.postMessage({ method: 'addRemoteLibraries', params: map }); return map; }) .catch((error: Response | any) => { console.error(error); return Observable.throw(error); }); } load(libraryKey: string): void { this.worker.postMessage({method: 'import', params: libraryKey}); } }