annotate vamp/__init__.py @ 68:e15e684d2af4

Some tricky tests for returned values from the test plugin
author Chris Cannam
date Wed, 14 Jan 2015 14:31:01 +0000
parents 6f6a54963ce8
children ffdeb8808fc1
rev   line source
Chris@56 1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
Chris@56 2
Chris@68 3 ###!!! todo: move all the real code out of __init__.py
Chris@68 4
Chris@56 5 import vampyhost
Chris@56 6 import numpy
Chris@56 7
Chris@56 8 def listPlugins():
Chris@56 9 return vampyhost.listPlugins()
Chris@56 10
Chris@56 11 def framesFromArray(arr, stepSize, frameSize):
Chris@56 12 """Generate a list of frames of size frameSize, extracted from the input array arr at stepSize intervals"""
Chris@56 13 # presumably such a function exists in many places, but I need practice
Chris@56 14 assert(stepSize > 0)
Chris@56 15 if arr.ndim == 1: # turn 1d into 2d array with 1 channel
Chris@56 16 arr = numpy.reshape(arr, (1, arr.shape[0]))
Chris@56 17 assert(arr.ndim == 2)
Chris@56 18 n = arr.shape[1]
Chris@56 19 i = 0
Chris@56 20 while (i < n):
Chris@56 21 frame = arr[:, i : i + frameSize]
Chris@56 22 w = frame.shape[1]
Chris@56 23 if (w < frameSize):
Chris@56 24 pad = numpy.zeros((frame.shape[0], frameSize - w))
Chris@56 25 frame = numpy.concatenate((frame, pad), 1)
Chris@56 26 yield frame
Chris@56 27 i = i + stepSize
Chris@56 28
Chris@68 29
Chris@68 30 def loadAndConfigureFor(data, samplerate, key, parameters):
Chris@58 31 plug = vampyhost.loadPlugin(key, samplerate,
Chris@58 32 vampyhost.AdaptInputDomain +
Chris@58 33 vampyhost.AdaptChannelCount)
Chris@64 34
Chris@68 35 plug.setParameterValues(parameters)
Chris@68 36
Chris@68 37 stepSize = plug.getPreferredStepSize()
Chris@68 38 blockSize = plug.getPreferredBlockSize()
Chris@68 39
Chris@68 40 if blockSize == 0:
Chris@68 41 blockSize = 1024
Chris@68 42 if stepSize == 0:
Chris@68 43 stepSize = blockSize ##!!! or blockSize/2, but check this with input domain adapter
Chris@68 44
Chris@68 45 channels = 1
Chris@68 46 if data.ndim > 1:
Chris@68 47 channels = data.shape[0]
Chris@68 48
Chris@68 49 plug.initialise(channels, stepSize, blockSize)
Chris@68 50 return (plug, stepSize, blockSize)
Chris@68 51
Chris@68 52
Chris@68 53 def process(data, samplerate, key, parameters = {}, outputs = []):
Chris@68 54 #!!! docstring
Chris@68 55
Chris@68 56 plug, stepSize, blockSize = loadAndConfigureFor(data, samplerate, key, parameters)
Chris@68 57
Chris@64 58 plugOuts = plug.getOutputs()
Chris@68 59 if plugOuts == []:
Chris@68 60 return
Chris@64 61
Chris@64 62 outIndices = dict(zip([o["identifier"] for o in plugOuts],
Chris@64 63 range(0, len(plugOuts)))) # id -> n
Chris@64 64
Chris@64 65 for o in outputs:
Chris@64 66 assert o in outIndices
Chris@64 67
Chris@66 68 if outputs == []:
Chris@67 69 outputs = [plugOuts[0]["identifier"]]
Chris@66 70
Chris@66 71 singleOutput = (len(outputs) == 1)
Chris@66 72
Chris@56 73 ff = framesFromArray(data, stepSize, blockSize)
Chris@61 74 fi = 0
Chris@64 75
Chris@61 76 for f in ff:
Chris@64 77 results = plug.processBlock(f, vampyhost.frame2RealTime(fi, samplerate))
Chris@64 78 # results is a dict mapping output number -> list of feature dicts
Chris@66 79 for o in outputs:
Chris@66 80 if outIndices[o] in results:
Chris@66 81 for r in results[outIndices[o]]:
Chris@66 82 if singleOutput:
Chris@66 83 yield r
Chris@66 84 else:
Chris@64 85 yield { o: r }
Chris@61 86 fi = fi + stepSize
Chris@61 87
Chris@66 88 results = plug.getRemainingFeatures()
Chris@66 89 for o in outputs:
Chris@66 90 if outIndices[o] in results:
Chris@66 91 for r in results[outIndices[o]]:
Chris@66 92 if singleOutput:
Chris@66 93 yield r
Chris@66 94 else:
Chris@66 95 yield { o: r }
Chris@66 96
Chris@66 97 plug.unload()
Chris@66 98
Chris@66 99