Mercurial > hg > ugly-duckling
diff src/app/services/feature-extraction/feature-extraction.service.ts @ 226:4865567d9e43
Refactor feature extraction service to use piper streaming client/server.
Change FeatureExtractionWorker accordingly and calling code.
author | Lucas Thompson <dev@lucas.im> |
---|---|
date | Fri, 21 Apr 2017 12:59:41 +0100 |
parents | 2c3fe51ad1f0 |
children | 53ea6406d601 |
line wrap: on
line diff
--- a/src/app/services/feature-extraction/feature-extraction.service.ts Fri Apr 21 12:58:55 2017 +0100 +++ b/src/app/services/feature-extraction/feature-extraction.service.ts Fri Apr 21 12:59:41 2017 +0100 @@ -1,13 +1,19 @@ import {Injectable, Inject} from '@angular/core'; import { - ListResponse, ListRequest + ListResponse } from "piper"; import { - SimpleRequest, SimpleResponse + SimpleRequest, + SimpleResponse } from "piper/HigherLevelUtilities"; import {Subject} from "rxjs/Subject"; import {Observable} from "rxjs"; import {Http, Response} from "@angular/http"; +import { + countingIdProvider, + WebWorkerStreamingClient +} from "piper/client-stubs/WebWorkerStreamingClient"; +import {RequestId} from "piper/protocols/WebWorkerProtocol"; interface RequestMessage<RequestType> { method: string; @@ -24,6 +30,11 @@ [libraryKey: string]: RepoUri; } +export interface Progress { + id: RequestId; + value: number; // between 0 and 100, for material-ui +} + @Injectable() export class FeatureExtractionService { @@ -32,6 +43,9 @@ featuresExtracted$: Observable<SimpleResponse>; private librariesUpdated: Subject<ListResponse>; librariesUpdated$: Observable<ListResponse>; + private progressUpdated: Subject<Progress>; + progressUpdated$: Observable<Progress>; + private client: WebWorkerStreamingClient; constructor(private http: Http, @Inject('PiperRepoUri') private repositoryUri: RepoUri) { this.worker = new Worker('bootstrap-feature-extraction-worker.js'); @@ -39,40 +53,65 @@ this.featuresExtracted$ = this.featuresExtracted.asObservable(); this.librariesUpdated = new Subject<ListResponse>(); this.librariesUpdated$ = this.librariesUpdated.asObservable(); + this.progressUpdated = new Subject<Progress>(); + this.progressUpdated$ = this.progressUpdated.asObservable(); this.worker.addEventListener('message', (ev: MessageEvent) => { const isValidResponse = ev.data.method === 'import' - && ev.data.result.available !== undefined; + && ev.data.result && ev.data.result.available ; if (isValidResponse) { + (ev as Event).stopImmediatePropagation(); this.librariesUpdated.next(ev.data.result); } - }); + }, true); + + this.client = new WebWorkerStreamingClient( + this.worker, + countingIdProvider(0) + ) } list(): Promise<ListResponse> { - return this.request<ListRequest, ListResponse>( - {method: 'list', params: {}}, - (ev: MessageEvent) => ev.data.result.available !== undefined - ).then(msg => msg.result); + return this.client.list({}); } - process(request: SimpleRequest): Promise<SimpleResponse> { - return this.request<SimpleRequest, SimpleResponse>( - {method: 'process', params: request}, - (ev: MessageEvent) => ev.data.method === 'process' - ).then(msg => { - this.featuresExtracted.next(msg.result); - return msg.result; - }); - } - - collect(request: SimpleRequest): Promise<SimpleResponse> { - return this.request<SimpleRequest, SimpleResponse>( - {method: 'collect', params: request}, - (ev: MessageEvent) => ev.data.method === 'collect' - ).then(msg => { - this.featuresExtracted.next(msg.result); - return msg.result; - }); + extract(analysisItemId: string, request: SimpleRequest): Promise<void> { + const arrayReducer = (acc, val) => { + acc.push.apply(acc, val); + return acc; + }; + const typedArrayReducer = (acc: Float32Array, + val: Float32Array): Float32Array => { + return Float32Array.of(...acc, ...val); + }; + return this.client.collect(request) + .do(val => { + this.progressUpdated.next({ + id: analysisItemId, + value: (val.processedBlockCount / val.totalBlockCount) * 100 + }); + }) + .reduce((acc, val) => { + if (acc.features.data instanceof Array && + val.features.data instanceof Array) { + acc.features.data = arrayReducer( + acc.features.data, + val.features.data + ); + } else if (acc.features.data instanceof Float32Array && + val.features.data instanceof Float32Array) { + acc.features.data = typedArrayReducer( + acc.features.data, + val.features.data + ); + } else { + throw "Invalid feature output. Aborting"; + } + return acc; + }) + .toPromise() + .then((response) => { + this.featuresExtracted.next(response); + }); } updateAvailableLibraries(): Observable<AvailableLibraries> { @@ -94,18 +133,4 @@ load(libraryKey: string): void { this.worker.postMessage({method: 'import', params: libraryKey}); } - - private request<Req, Res>(request: RequestMessage<Req>, - predicate: (ev: MessageEvent) => boolean) - : Promise<ResponseMessage<Res>> { - return new Promise(res => { - const listener = (ev: MessageEvent) => { - this.worker.removeEventListener('message', listener); - if (predicate(ev)) - res(ev.data); - }; - this.worker.addEventListener('message', listener); - this.worker.postMessage(request); - }).catch(err => console.error(err)); - } }