Mercurial > hg > may
view yetilab/stream/filter.yeti @ 194:8148422e9102
Rename truncatedTo to withDuration and make it able to extend as well as truncate
author | Chris Cannam |
---|---|
date | Mon, 06 May 2013 15:03:22 +0100 |
parents | 528f16f988d8 |
children | 3f4f3af724b0 |
line wrap: on
line source
module yetilab.stream.filter; mat = load yetilab.matrix.matrix; bl = load yetilab.block.block; load yetilab.stream.streamtype; minDurationOf d1 d2 = case d1 of Known a: case d2 of Known b: Known (min a b); Unknown (): Unknown (); Infinite (): Known a; esac; Unknown (): case d2 of Known b: Known b; Unknown (): Unknown (); Infinite (): Unknown (); esac; Infinite (): d2; esac; withDuration nsamples s = //!!! should nsamples be a time in seconds? (no) (var pos = 0; { get position () = pos, get channels () = s.channels, get sampleRate () = s.sampleRate, get available () = Known (nsamples - pos), get finished? () = not (nsamples > pos), read count = (n = min count (nsamples - pos); pos := pos + n; if not s.finished? then fromStream = s.read n; got = fromStream.size.columns; mat.concat (Horizontal ()) (fromStream :: if got == n then [] else [mat.zeroMatrix { columns = n - got, rows = s.channels}] fi); else mat.zeroMatrix { columns = n, rows = s.channels } fi), close = s.close, }); delayedBy nsamples s = //!!! should nsamples be a time in seconds? (no) (var prepos = 0; delay = nsamples; zeros n = mat.toRowMajor (prepos := prepos + n; mat.zeroMatrix { rows = s.channels, columns = n }); { get position () = if prepos < delay then prepos else s.position + delay fi, get channels () = s.channels, get sampleRate () = s.sampleRate, get available () = case s.available of Known a: Known (a + delay - prepos); other: other esac, get finished? () = (prepos >= delay) and s.finished?, read count = if prepos >= delay then s.read count elif prepos + count < delay then zeros count else nleft = delay - prepos; left = zeros nleft; right = s.read (count - nleft); mat.concat (Horizontal ()) [left, right]; fi, close = s.close }); multiplexed streams = { get position () = head (sort (map (.position) streams)), // can differ after EOS get channels () = sum (map (.channels) streams), get sampleRate () = (head streams).sampleRate, get available () = fold do dur s: minDurationOf dur s.available done (Infinite ()) streams, get finished? () = any id (map (.finished?) streams), read count = (outs = map do s: s.read count done streams; minlen = head (sort (map do m: m.size.columns done outs)); outs = map do m: mat.resizedTo { rows = m.size.rows, columns = minlen } m done outs; mat.concat (Vertical ()) outs ), close () = for streams do s: s.close() done, }; repeated s = // There is no way to reset a stream (as in principle, they might // be "live") so we can't read from the same stream repeatedly -- // we have to cache its output and then repeat that. This is a // little tricky to do efficiently without knowing how long the // stream is (in general) or how much is going to be requested at // a time. if s.available == Infinite () then s else var pos = 0; var cache = mat.zeroSizeMatrix (); chunks = array []; cachedPartsFor count = (start = pos % cache.size.columns; avail = cache.size.columns - start; if avail >= count then pos := pos + count; [mat.columnSlice start count cache] else pos := pos + avail; mat.columnSlice start avail cache :: cachedPartsFor (count - avail); fi); readFromCache count = (if cache.size.columns == 0 then cache := mat.concat (Horizontal ()) (list chunks); clearArray chunks; fi; if cache.size.columns == 0 then cache else mat.concat (Horizontal ()) (cachedPartsFor count); fi); { get position () = pos, get channels () = s.channels, get sampleRate () = s.sampleRate, get available () = Infinite (), get finished? () = false, read count = if s.finished? then readFromCache count else part = s.read count; len = part.size.columns; push chunks part; pos := pos + len; if len == count then part else mat.concat (Horizontal ()) [part, readFromCache (count - len)]; fi; fi, close = s.close } fi; { withDuration, delayedBy, multiplexed, repeated, //!!!} as { // withDuration is number -> stream -> stream }