diff src/app/services/feature-extraction/feature-extraction.service.ts @ 226:4865567d9e43

Refactor feature extraction service to use piper streaming client/server. Change FeatureExtractionWorker accordingly and calling code.
author Lucas Thompson <dev@lucas.im>
date Fri, 21 Apr 2017 12:59:41 +0100
parents 2c3fe51ad1f0
children 53ea6406d601
line wrap: on
line diff
--- a/src/app/services/feature-extraction/feature-extraction.service.ts	Fri Apr 21 12:58:55 2017 +0100
+++ b/src/app/services/feature-extraction/feature-extraction.service.ts	Fri Apr 21 12:59:41 2017 +0100
@@ -1,13 +1,19 @@
 import {Injectable, Inject} from '@angular/core';
 import {
-  ListResponse, ListRequest
+  ListResponse
 } from "piper";
 import {
-  SimpleRequest, SimpleResponse
+  SimpleRequest,
+  SimpleResponse
 } from "piper/HigherLevelUtilities";
 import {Subject} from "rxjs/Subject";
 import {Observable} from "rxjs";
 import {Http, Response} from "@angular/http";
+import {
+  countingIdProvider,
+  WebWorkerStreamingClient
+} from "piper/client-stubs/WebWorkerStreamingClient";
+import {RequestId} from "piper/protocols/WebWorkerProtocol";
 
 interface RequestMessage<RequestType> {
   method: string;
@@ -24,6 +30,11 @@
   [libraryKey: string]: RepoUri;
 }
 
+export interface Progress {
+  id: RequestId;
+  value: number; // between 0 and 100, for material-ui
+}
+
 @Injectable()
 export class FeatureExtractionService {
 
@@ -32,6 +43,9 @@
   featuresExtracted$: Observable<SimpleResponse>;
   private librariesUpdated: Subject<ListResponse>;
   librariesUpdated$: Observable<ListResponse>;
+  private progressUpdated: Subject<Progress>;
+  progressUpdated$: Observable<Progress>;
+  private client: WebWorkerStreamingClient;
 
   constructor(private http: Http, @Inject('PiperRepoUri') private repositoryUri: RepoUri) {
     this.worker = new Worker('bootstrap-feature-extraction-worker.js');
@@ -39,40 +53,65 @@
     this.featuresExtracted$ = this.featuresExtracted.asObservable();
     this.librariesUpdated = new Subject<ListResponse>();
     this.librariesUpdated$ = this.librariesUpdated.asObservable();
+    this.progressUpdated = new Subject<Progress>();
+    this.progressUpdated$ = this.progressUpdated.asObservable();
     this.worker.addEventListener('message', (ev: MessageEvent) => {
       const isValidResponse = ev.data.method === 'import'
-        && ev.data.result.available !== undefined;
+        && ev.data.result && ev.data.result.available ;
       if (isValidResponse) {
+        (ev as Event).stopImmediatePropagation();
         this.librariesUpdated.next(ev.data.result);
       }
-    });
+    }, true);
+
+    this.client = new WebWorkerStreamingClient(
+      this.worker,
+      countingIdProvider(0)
+    )
   }
 
   list(): Promise<ListResponse> {
-    return this.request<ListRequest, ListResponse>(
-      {method: 'list', params: {}},
-      (ev: MessageEvent) => ev.data.result.available !== undefined
-    ).then(msg => msg.result);
+    return this.client.list({});
   }
 
-  process(request: SimpleRequest): Promise<SimpleResponse> {
-    return this.request<SimpleRequest, SimpleResponse>(
-      {method: 'process', params: request},
-      (ev: MessageEvent) => ev.data.method === 'process'
-    ).then(msg => {
-      this.featuresExtracted.next(msg.result);
-      return msg.result;
-    });
-  }
-
-  collect(request: SimpleRequest): Promise<SimpleResponse> {
-    return this.request<SimpleRequest, SimpleResponse>(
-      {method: 'collect', params: request},
-      (ev: MessageEvent) => ev.data.method === 'collect'
-    ).then(msg => {
-      this.featuresExtracted.next(msg.result);
-      return msg.result;
-    });
+  extract(analysisItemId: string, request: SimpleRequest): Promise<void> {
+    const arrayReducer = (acc, val) => {
+      acc.push.apply(acc, val);
+      return acc;
+    };
+    const typedArrayReducer = (acc: Float32Array,
+                               val: Float32Array): Float32Array => {
+      return Float32Array.of(...acc, ...val);
+    };
+    return this.client.collect(request)
+      .do(val => {
+        this.progressUpdated.next({
+          id: analysisItemId,
+          value: (val.processedBlockCount / val.totalBlockCount) * 100
+        });
+      })
+      .reduce((acc, val) => {
+        if (acc.features.data instanceof Array &&
+          val.features.data instanceof Array) {
+          acc.features.data = arrayReducer(
+            acc.features.data,
+            val.features.data
+          );
+        } else if (acc.features.data instanceof Float32Array &&
+          val.features.data instanceof Float32Array) {
+          acc.features.data = typedArrayReducer(
+            acc.features.data,
+            val.features.data
+          );
+        } else {
+          throw "Invalid feature output. Aborting";
+        }
+        return acc;
+      })
+      .toPromise()
+      .then((response) => {
+        this.featuresExtracted.next(response);
+      });
   }
 
   updateAvailableLibraries(): Observable<AvailableLibraries> {
@@ -94,18 +133,4 @@
   load(libraryKey: string): void {
     this.worker.postMessage({method: 'import', params: libraryKey});
   }
-
-  private request<Req, Res>(request: RequestMessage<Req>,
-                            predicate: (ev: MessageEvent) => boolean)
-  : Promise<ResponseMessage<Res>> {
-    return new Promise(res => {
-      const listener = (ev: MessageEvent) => {
-        this.worker.removeEventListener('message', listener);
-        if (predicate(ev))
-          res(ev.data);
-      };
-      this.worker.addEventListener('message', listener);
-      this.worker.postMessage(request);
-    }).catch(err => console.error(err));
-  }
 }