changeset 243:7106cdd59e62

Combine the features as they come in. On every percentage point change, emit a StreamingResponse with the collected features so far, enabling progress reporting for the client.
author Lucas Thompson <dev@lucas.im>
date Wed, 26 Apr 2017 16:16:48 +0100
parents ca5fcdfa0851
children b8d1988b774f
files src/app/services/feature-extraction/FeatureExtractionWorker.ts
diffstat 1 files changed, 28 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts	Wed Apr 26 16:14:25 2017 +0100
+++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts	Wed Apr 26 16:16:48 2017 +0100
@@ -7,7 +7,9 @@
   SimpleRequest
 } from 'piper/HigherLevelUtilities';
 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
-import {AvailableLibraries} from './feature-extraction.service';
+import {
+  AvailableLibraries
+} from './feature-extraction.service';
 import {
   DedicatedWorkerGlobalScope,
   WebWorkerStreamingServer
@@ -19,6 +21,7 @@
 } from 'piper/StreamingService';
 import {Observable} from 'rxjs/Observable';
 import {EmscriptenModule} from 'piper/PiperVampService';
+import {streamingResponseReducer} from './FeatureReducers';
 
 interface MessageEvent {
   readonly data: any;
@@ -67,6 +70,29 @@
   }
 }
 
+class ReducingAggregateService extends AggregateStreamingService {
+  constructor() {
+    super();
+  }
+
+  collect(request: SimpleRequest): Observable<StreamingResponse> {
+    let lastPercentagePoint = 0;
+    return super.collect(request)
+      .scan(streamingResponseReducer)
+      .filter(val => {
+        const percentage =
+          100 * (val.processedBlockCount / val.totalBlockCount) | 0;
+        const pointDifference = (percentage - lastPercentagePoint);
+        if (pointDifference === 1 || percentage === 100) {
+          lastPercentagePoint = percentage;
+          return true;
+        } else {
+          return false;
+        }
+      });
+  }
+}
+
 export default class FeatureExtractionWorker {
   private workerScope: DedicatedWorkerGlobalScope;
   private remoteLibraries: Map<LibraryKey, LibraryUri>;
@@ -77,7 +103,7 @@
               private requireJs: RequireJs) {
     this.workerScope = workerScope;
     this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
-    this.service = new AggregateStreamingService();
+    this.service = new ReducingAggregateService();
     this.setupImportLibraryListener();
     this.server = new WebWorkerStreamingServer(
       this.workerScope,