changeset 505:261bf81f29b4

Speed up duplicated streams -- don't advance after every single read; don't recalculate low tide quite so much
author Chris Cannam
date Wed, 20 Nov 2013 10:14:53 +0000
parents 4b85578159e1
children a470001a510b 6eafcc3fc29d
files src/may/stream/manipulate.yeti
diffstat 1 files changed, 18 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/src/may/stream/manipulate.yeti	Tue Nov 19 14:37:39 2013 +0000
+++ b/src/may/stream/manipulate.yeti	Wed Nov 20 10:14:53 2013 +0000
@@ -266,16 +266,26 @@
     array if copies < 2 then map \s [1..copies];
     else
         pos = concurrentHash ();
-        lowtide () = head (sort (map (at pos) (keys pos)));
+        var lowtide = 0;
         var hightide = 0;
-        var cache = mat.zeroSizeMatrix ();
+        var adcount = 0;
+        var cache = mat.toRowMajor (mat.zeroSizeMatrix ());
         syncd = synchronized pos;
         advance i n =
-           (formerLow = lowtide ();
+           (wasLow = (pos[i] == lowtide);
             pos[i] := pos[i] + n;
-            encroachment = lowtide () - formerLow;
-            if encroachment > 0 then
-                cache := mat.columnSlice cache encroachment (mat.width cache);
+            if wasLow then
+                adcount := adcount + 1;
+                if adcount >= copies then
+                    adcount := 0;
+                    formerLow = lowtide;
+                    lowtide := head (sort (map (at pos) (keys pos)));
+                    encroachment = lowtide - formerLow;
+                    if encroachment > 0 then
+                        cache := mat.columnSlice cache encroachment
+                           (mat.width cache);
+                    fi
+                fi
             fi);
         map do instance:
             pos[instance] := 0;
@@ -295,14 +305,14 @@
                 read count = syncd
                   \(ready = hightide - pos[instance];
                     if s.finished? and ready <= 0 
-                    then mat.zeroSizeMatrix ()
+                    then mat.toRowMajor (mat.zeroSizeMatrix ())
                     else
                         if count > ready then
                             more = s.read (count - ready);
                             cache := mat.concatHorizontal [cache, more];
                             hightide := hightide + (mat.width more);
                         fi;
-                        offset = pos[instance] - (lowtide ());
+                        offset = pos[instance] - lowtide;
                         chunk = mat.columnSlice cache offset (offset + count);
                         advance instance (mat.width chunk);
                         chunk;