changeset 305:75a234459d3b

Fix for changes to streaming api in piper-js i.e. collect on the client
author Lucas Thompson <dev@lucas.im>
date Fri, 12 May 2017 08:28:18 +0100
parents 5527c0f82059
children cd117c836ca7
files src/app/services/feature-extraction/FeatureExtractionWorker.ts src/app/services/feature-extraction/feature-extraction.service.ts
diffstat 2 files changed, 30 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts	Fri May 12 08:26:18 2017 +0100
+++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts	Fri May 12 08:28:18 2017 +0100
@@ -63,11 +63,7 @@
     return this.dispatch('process', request);
   }
 
-  collect(request: SimpleRequest): Observable<StreamingResponse> {
-    return this.dispatch('collect', request);
-  }
-
-  protected dispatch(method: 'process' | 'collect',
+  protected dispatch(method: 'process',
                      request: SimpleRequest): Observable<StreamingResponse> {
     const key = request.key.split(':')[0];
     return this.services.has(key) ?
@@ -80,19 +76,27 @@
     super();
   }
 
-  protected dispatch(method: 'process' | 'collect',
+  protected dispatch(method: 'process',
                      request: SimpleRequest): Observable<StreamingResponse> {
     let lastPercentagePoint = 0;
+    let shouldClear = false;
     return super.dispatch(method, request)
-      .scan(streamingResponseReducer)
+      .scan((acc, value) => {
+        if (shouldClear) {
+          acc.features = [];
+        }
+        return streamingResponseReducer(acc, value);
+      })
       .filter(val => {
+        const progress = val.progress;
         const percentage =
-          100 * (val.processedBlockCount / val.totalBlockCount) | 0;
+          100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
         const pointDifference = (percentage - lastPercentagePoint);
         const shouldEmit = pointDifference === 1 || percentage === 100;
         if (shouldEmit) {
           lastPercentagePoint = percentage;
         }
+        shouldClear = shouldEmit;
         return shouldEmit;
       });
   }
--- a/src/app/services/feature-extraction/feature-extraction.service.ts	Fri May 12 08:26:18 2017 +0100
+++ b/src/app/services/feature-extraction/feature-extraction.service.ts	Fri May 12 08:28:18 2017 +0100
@@ -14,6 +14,7 @@
   WebWorkerStreamingClient
 } from 'piper/client-stubs/WebWorkerStreamingClient';
 import {RequestId} from 'piper/protocols/WebWorkerProtocol';
+import {collect, StreamingConfiguration} from "piper/StreamingService";
 
 type RepoUri = string;
 export interface AvailableLibraries {
@@ -66,19 +67,24 @@
   }
 
   extract(analysisItemId: string, request: SimpleRequest): Promise<void> {
-    return this.client.collect(request)
-      .do(val => {
-        if (val.totalBlockCount > 0) {
-          this.progressUpdated.next({
-            id: analysisItemId,
-            value: (val.processedBlockCount / val.totalBlockCount) * 100
-          });
-        }
-      })
-      .toPromise()
-      .then((response) => {
-        this.featuresExtracted.next(response);
+    let config: StreamingConfiguration;
+    return collect(this.client.process(request), val => {
+      if (val.configuration) {
+        config = val.configuration;
+      }
+      const progress = val.progress;
+      if (progress.totalBlockCount > 0) {
+        this.progressUpdated.next({
+          id: analysisItemId,
+          value: (progress.processedBlockCount / progress.totalBlockCount) * 100
+        });
+      }
+    }).then(features => {
+      this.featuresExtracted.next({
+        features: features,
+        outputDescriptor: config.outputDescriptor
       });
+    });
   }
 
   updateAvailableLibraries(): Observable<AvailableLibraries> {