changeset 244:b8d1988b774f

Merge pull request #18 from LucasThompson/fix/slow-streaming Fix/slow streaming
author Lucas Thompson <LucasThompson@users.noreply.github.com>
date Wed, 26 Apr 2017 16:21:51 +0100
parents c03b6a1eb9c9 (current diff) 7106cdd59e62 (diff)
children f7c23d489c75
files
diffstat 3 files changed, 91 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts	Wed Apr 26 09:38:17 2017 +0100
+++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts	Wed Apr 26 16:21:51 2017 +0100
@@ -7,7 +7,9 @@
   SimpleRequest
 } from 'piper/HigherLevelUtilities';
 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
-import {AvailableLibraries} from './feature-extraction.service';
+import {
+  AvailableLibraries
+} from './feature-extraction.service';
 import {
   DedicatedWorkerGlobalScope,
   WebWorkerStreamingServer
@@ -19,6 +21,7 @@
 } from 'piper/StreamingService';
 import {Observable} from 'rxjs/Observable';
 import {EmscriptenModule} from 'piper/PiperVampService';
+import {streamingResponseReducer} from './FeatureReducers';
 
 interface MessageEvent {
   readonly data: any;
@@ -67,6 +70,29 @@
   }
 }
 
+class ReducingAggregateService extends AggregateStreamingService {
+  constructor() {
+    super();
+  }
+
+  collect(request: SimpleRequest): Observable<StreamingResponse> {
+    let lastPercentagePoint = 0;
+    return super.collect(request)
+      .scan(streamingResponseReducer)
+      .filter(val => {
+        const percentage =
+          100 * (val.processedBlockCount / val.totalBlockCount) | 0;
+        const pointDifference = (percentage - lastPercentagePoint);
+        if (pointDifference === 1 || percentage === 100) {
+          lastPercentagePoint = percentage;
+          return true;
+        } else {
+          return false;
+        }
+      });
+  }
+}
+
 export default class FeatureExtractionWorker {
   private workerScope: DedicatedWorkerGlobalScope;
   private remoteLibraries: Map<LibraryKey, LibraryUri>;
@@ -77,7 +103,7 @@
               private requireJs: RequireJs) {
     this.workerScope = workerScope;
     this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
-    this.service = new AggregateStreamingService();
+    this.service = new ReducingAggregateService();
     this.setupImportLibraryListener();
     this.server = new WebWorkerStreamingServer(
       this.workerScope,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/app/services/feature-extraction/FeatureReducers.ts	Wed Apr 26 16:21:51 2017 +0100
@@ -0,0 +1,63 @@
+/**
+ * Created by lucast on 26/04/2017.
+ */
+import {StreamingResponse} from 'piper/StreamingService';
+import {Feature} from 'piper/Feature';
+import {SampleType} from 'piper';
+
+export const arrayReducer = <T>(acc: T[], val: T[]): T[] => {
+  acc.push.apply(acc, val);
+  return acc;
+};
+
+export const typedArrayReducer = (acc: Float32Array,
+                                  val: Float32Array): Float32Array => {
+  return Float32Array.of(...acc, ...val);
+};
+
+const inPlaceTypedArrayReducer = (acc: Float32Array,
+                                  val: Float32Array,
+                                  i: number): Float32Array => {
+  acc.set(val, i);
+  return acc;
+};
+
+export const streamingResponseReducer = (acc: StreamingResponse,
+                                         val: StreamingResponse,
+                                         i: number): StreamingResponse => {
+  acc.processedBlockCount = val.processedBlockCount;
+  if (acc.features.data instanceof Array &&
+    val.features.data instanceof Array) {
+    acc.features.data = arrayReducer<Feature>(
+      acc.features.data,
+      val.features.data
+    );
+  } else if (acc.features.data instanceof Float32Array &&
+    val.features.data instanceof Float32Array) {
+    const isOneSamplePerStep = acc.outputDescriptor.configured.sampleType ===
+      SampleType.OneSamplePerStep;
+    if (isOneSamplePerStep) {
+      // for one sample per step vectors we know there will be totalBlockCount
+      // number of samples - so pre-allocate the Float32Array when we know
+      // the totalBlockCount (after receiving the first feature)
+      if ( i === 1  ) {
+        const newBlock = new Float32Array(acc.totalBlockCount);
+        newBlock[0] = acc.features.data[0];
+        acc.features.data = newBlock;
+      }
+      acc.features.data = inPlaceTypedArrayReducer(
+        acc.features.data,
+        val.features.data,
+        i
+      );
+    } else { // if not OneSamplePerStep we have to make a new array each time
+      acc.features.data = typedArrayReducer(
+        acc.features.data,
+        val.features.data
+      );
+    }
+  } else {
+    throw new Error('Invalid feature output. Aborting');
+  }
+  return acc;
+};
--- a/src/app/services/feature-extraction/feature-extraction.service.ts	Wed Apr 26 09:38:17 2017 +0100
+++ b/src/app/services/feature-extraction/feature-extraction.service.ts	Wed Apr 26 16:21:51 2017 +0100
@@ -66,14 +66,6 @@
   }
 
   extract(analysisItemId: string, request: SimpleRequest): Promise<void> {
-    const arrayReducer = (acc, val) => {
-      acc.push.apply(acc, val);
-      return acc;
-    };
-    const typedArrayReducer = (acc: Float32Array,
-                               val: Float32Array): Float32Array => {
-      return Float32Array.of(...acc, ...val);
-    };
     return this.client.collect(request)
       .do(val => {
         this.progressUpdated.next({
@@ -81,24 +73,6 @@
           value: (val.processedBlockCount / val.totalBlockCount) * 100
         });
       })
-      .reduce((acc, val) => {
-        if (acc.features.data instanceof Array &&
-          val.features.data instanceof Array) {
-          acc.features.data = arrayReducer(
-            acc.features.data,
-            val.features.data
-          );
-        } else if (acc.features.data instanceof Float32Array &&
-          val.features.data instanceof Float32Array) {
-          acc.features.data = typedArrayReducer(
-            acc.features.data,
-            val.features.data
-          );
-        } else {
-          throw new Error('Invalid feature output. Aborting');
-        }
-        return acc;
-      })
       .toPromise()
       .then((response) => {
         this.featuresExtracted.next(response);