view yetilab/stream/filter.yeti @ 194:8148422e9102

Rename truncatedTo to withDuration and make it able to extend as well as truncate
author Chris Cannam
date Mon, 06 May 2013 15:03:22 +0100
parents 528f16f988d8
children 3f4f3af724b0
line wrap: on
line source

module yetilab.stream.filter;

mat = load yetilab.matrix.matrix;
bl = load yetilab.block.block;

load yetilab.stream.streamtype;

minDurationOf d1 d2 =
    case d1 of 
    Known a:
        case d2 of 
        Known b: Known (min a b);
        Unknown (): Unknown ();
        Infinite (): Known a;
        esac;
    Unknown ():
        case d2 of 
        Known b: Known b;
        Unknown (): Unknown ();
        Infinite (): Unknown ();
        esac;
    Infinite ():
        d2;
    esac;

withDuration nsamples s = //!!! should nsamples be a time in seconds? (no)
   (var pos = 0;
    {
        get position () = pos,
        get channels () = s.channels, 
        get sampleRate () = s.sampleRate,
        get available () = Known (nsamples - pos),
        get finished? () = not (nsamples > pos),
        read count =
           (n = min count (nsamples - pos);
            pos := pos + n;
            if not s.finished? then
                fromStream = s.read n;
                got = fromStream.size.columns;
                mat.concat (Horizontal ())
                   (fromStream ::
                    if got == n then []
                    else [mat.zeroMatrix { columns = n - got, rows = s.channels}]
                    fi);
            else 
                mat.zeroMatrix { columns = n, rows = s.channels }
            fi),
        close = s.close,
    });

delayedBy nsamples s = //!!! should nsamples be a time in seconds? (no)
   (var prepos = 0;
    delay = nsamples;
    zeros n = mat.toRowMajor
       (prepos := prepos + n;
        mat.zeroMatrix { rows = s.channels, columns = n });
    {
        get position () = 
            if prepos < delay then prepos else s.position + delay fi,
        get channels () = s.channels,
        get sampleRate () = s.sampleRate,
        get available () = 
            case s.available of 
            Known a: Known (a + delay - prepos); 
            other: other 
            esac,
        get finished? () = (prepos >= delay) and s.finished?,
        read count =
            if prepos >= delay then s.read count
            elif prepos + count < delay then zeros count
            else
                nleft = delay - prepos;
                left = zeros nleft;
                right = s.read (count - nleft);
                mat.concat (Horizontal ()) [left, right];
            fi,
        close = s.close
    });

multiplexed streams = 
    {
        get position () = head (sort (map (.position) streams)), // can differ after EOS
        get channels () = sum (map (.channels) streams),
        get sampleRate () = (head streams).sampleRate,
        get available () = 
            fold do dur s: minDurationOf dur s.available done (Infinite ()) streams,
        get finished? () = any id (map (.finished?) streams),
        read count =
           (outs = map do s: s.read count done streams;
            minlen = head (sort (map do m: m.size.columns done outs));
            outs = map do m:
                mat.resizedTo { rows = m.size.rows, columns = minlen } m
                done outs;
            mat.concat (Vertical ()) outs
            ),
        close () = for streams do s: s.close() done,
    };

repeated s =
    // There is no way to reset a stream (as in principle, they might
    // be "live") so we can't read from the same stream repeatedly --
    // we have to cache its output and then repeat that. This is a
    // little tricky to do efficiently without knowing how long the
    // stream is (in general) or how much is going to be requested at
    // a time.
    if s.available == Infinite () then s
    else
        var pos = 0;
        var cache = mat.zeroSizeMatrix ();
        chunks = array [];
        cachedPartsFor count =
           (start = pos % cache.size.columns;
            avail = cache.size.columns - start;
            if avail >= count then
                pos := pos + count;
                [mat.columnSlice start count cache]
            else
                pos := pos + avail;
                mat.columnSlice start avail cache ::
                    cachedPartsFor (count - avail);
            fi);
        readFromCache count =
           (if cache.size.columns == 0 then
                cache := mat.concat (Horizontal ()) (list chunks);
                clearArray chunks;
            fi;
            if cache.size.columns == 0 then
                cache
            else
                mat.concat (Horizontal ()) (cachedPartsFor count);
            fi);
        {
            get position () = pos,
            get channels () = s.channels,
            get sampleRate () = s.sampleRate,
            get available () = Infinite (),
            get finished? () = false,
            read count =
                if s.finished? then
                    readFromCache count
                else
                    part = s.read count;
                    len = part.size.columns;
                    push chunks part;
                    pos := pos + len;
                    if len == count then part
                    else
                        mat.concat (Horizontal ())
                           [part, readFromCache (count - len)];
                    fi;
                fi,
            close = s.close
        }
    fi;

{
    withDuration, 
    delayedBy,
    multiplexed,
    repeated,
//!!!} as {
//    withDuration is number -> stream -> stream
}