changeset 188:8a8b07134cad

Start on repeated stream
author Chris Cannam
date Sun, 05 May 2013 12:41:50 +0100
parents fd16551c2fc8
children fa9a15681ba9
files yetilab/stream/filter.yeti
diffstat 1 files changed, 46 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/yetilab/stream/filter.yeti	Sun May 05 12:41:42 2013 +0100
+++ b/yetilab/stream/filter.yeti	Sun May 05 12:41:50 2013 +0100
@@ -90,10 +90,56 @@
         close () = for streams do s: s.close() done,
     };
 
+repeated s =
+    // There is no way to reset a stream (in principle, they might be
+    // "live") so we can't read from the same stream repeatedly -- we
+    // have to cache its output and then repeat that. This is tricky
+    // to do efficiently without knowing how much is going to be
+    // requested at a time.
+    if s.available == Infinite () then s
+    else
+        var pos = 0;
+        var cache = mat.zeroSizeMatrix ();
+        chunks = array [];
+        readFromCache count =
+           (if cache.size.columns == 0 then
+                cache := mat.concat (Horizontal ()) chunks;
+                clearArray chunks;
+            fi;
+
+            //!!! implement
+            result = mat.zeroSizeMatrix ();
+
+            pos := pos + count;
+            result);
+        {
+            get position () = pos,
+            get channels () = s.channels,
+            get available () = Infinite (),
+            get finished? () = false,
+            read count =
+                if s.finished? then
+                    readFromCache count
+                else
+                    part = s.read count;
+                    len = part.size.columns;
+                    push chunks part;
+                    pos := pos + len;
+                    if len == count then part
+                    else
+                        mat.concat (Horizontal ())
+                           [part, readFromCache (count - len)];
+                    fi;
+                fi,
+            close = s.close
+        }
+    fi;
+
 {
     truncatedTo, 
     delayedBy,
     multiplexed,
+    repeated,
 //!!!} as {
 //    truncatedTo is number -> stream -> stream
 }