Mercurial > hg > may
changeset 505:261bf81f29b4
Speed up duplicated streams -- don't advance after every single read; don't recalculate low tide quite so much
author | Chris Cannam |
---|---|
date | Wed, 20 Nov 2013 10:14:53 +0000 |
parents | 4b85578159e1 |
children | a470001a510b 6eafcc3fc29d |
files | src/may/stream/manipulate.yeti |
diffstat | 1 files changed, 18 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/src/may/stream/manipulate.yeti Tue Nov 19 14:37:39 2013 +0000 +++ b/src/may/stream/manipulate.yeti Wed Nov 20 10:14:53 2013 +0000 @@ -266,16 +266,26 @@ array if copies < 2 then map \s [1..copies]; else pos = concurrentHash (); - lowtide () = head (sort (map (at pos) (keys pos))); + var lowtide = 0; var hightide = 0; - var cache = mat.zeroSizeMatrix (); + var adcount = 0; + var cache = mat.toRowMajor (mat.zeroSizeMatrix ()); syncd = synchronized pos; advance i n = - (formerLow = lowtide (); + (wasLow = (pos[i] == lowtide); pos[i] := pos[i] + n; - encroachment = lowtide () - formerLow; - if encroachment > 0 then - cache := mat.columnSlice cache encroachment (mat.width cache); + if wasLow then + adcount := adcount + 1; + if adcount >= copies then + adcount := 0; + formerLow = lowtide; + lowtide := head (sort (map (at pos) (keys pos))); + encroachment = lowtide - formerLow; + if encroachment > 0 then + cache := mat.columnSlice cache encroachment + (mat.width cache); + fi + fi fi); map do instance: pos[instance] := 0; @@ -295,14 +305,14 @@ read count = syncd \(ready = hightide - pos[instance]; if s.finished? and ready <= 0 - then mat.zeroSizeMatrix () + then mat.toRowMajor (mat.zeroSizeMatrix ()) else if count > ready then more = s.read (count - ready); cache := mat.concatHorizontal [cache, more]; hightide := hightide + (mat.width more); fi; - offset = pos[instance] - (lowtide ()); + offset = pos[instance] - lowtide; chunk = mat.columnSlice cache offset (offset + count); advance instance (mat.width chunk); chunk;