Chris@56
|
1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
|
Chris@56
|
2
|
Chris@68
|
3 ###!!! todo: move all the real code out of __init__.py
|
Chris@68
|
4
|
Chris@56
|
5 import vampyhost
|
Chris@56
|
6 import numpy
|
Chris@56
|
7
|
Chris@56
|
8 def listPlugins():
|
Chris@56
|
9 return vampyhost.listPlugins()
|
Chris@56
|
10
|
Chris@56
|
11 def framesFromArray(arr, stepSize, frameSize):
|
Chris@56
|
12 """Generate a list of frames of size frameSize, extracted from the input array arr at stepSize intervals"""
|
Chris@56
|
13 # presumably such a function exists in many places, but I need practice
|
Chris@56
|
14 assert(stepSize > 0)
|
Chris@56
|
15 if arr.ndim == 1: # turn 1d into 2d array with 1 channel
|
Chris@56
|
16 arr = numpy.reshape(arr, (1, arr.shape[0]))
|
Chris@56
|
17 assert(arr.ndim == 2)
|
Chris@56
|
18 n = arr.shape[1]
|
Chris@56
|
19 i = 0
|
Chris@56
|
20 while (i < n):
|
Chris@56
|
21 frame = arr[:, i : i + frameSize]
|
Chris@56
|
22 w = frame.shape[1]
|
Chris@56
|
23 if (w < frameSize):
|
Chris@56
|
24 pad = numpy.zeros((frame.shape[0], frameSize - w))
|
Chris@56
|
25 frame = numpy.concatenate((frame, pad), 1)
|
Chris@56
|
26 yield frame
|
Chris@56
|
27 i = i + stepSize
|
Chris@56
|
28
|
Chris@68
|
29
|
Chris@74
|
30 def loadAndConfigureFor(data, sampleRate, key, parameters):
|
Chris@74
|
31 plug = vampyhost.loadPlugin(key, sampleRate,
|
Chris@58
|
32 vampyhost.AdaptInputDomain +
|
Chris@58
|
33 vampyhost.AdaptChannelCount)
|
Chris@64
|
34
|
Chris@68
|
35 plug.setParameterValues(parameters)
|
Chris@68
|
36
|
Chris@68
|
37 stepSize = plug.getPreferredStepSize()
|
Chris@68
|
38 blockSize = plug.getPreferredBlockSize()
|
Chris@68
|
39
|
Chris@68
|
40 if blockSize == 0:
|
Chris@68
|
41 blockSize = 1024
|
Chris@68
|
42 if stepSize == 0:
|
Chris@68
|
43 stepSize = blockSize ##!!! or blockSize/2, but check this with input domain adapter
|
Chris@68
|
44
|
Chris@68
|
45 channels = 1
|
Chris@68
|
46 if data.ndim > 1:
|
Chris@68
|
47 channels = data.shape[0]
|
Chris@68
|
48
|
Chris@68
|
49 plug.initialise(channels, stepSize, blockSize)
|
Chris@68
|
50 return (plug, stepSize, blockSize)
|
Chris@68
|
51
|
Chris@68
|
52
|
Chris@74
|
53 def process(data, sampleRate, key, parameters = {}, outputs = []):
|
Chris@68
|
54 #!!! docstring
|
Chris@68
|
55
|
Chris@74
|
56 plug, stepSize, blockSize = loadAndConfigureFor(data, sampleRate, key, parameters)
|
Chris@68
|
57
|
Chris@64
|
58 plugOuts = plug.getOutputs()
|
Chris@68
|
59 if plugOuts == []:
|
Chris@68
|
60 return
|
Chris@64
|
61
|
Chris@64
|
62 outIndices = dict(zip([o["identifier"] for o in plugOuts],
|
Chris@64
|
63 range(0, len(plugOuts)))) # id -> n
|
Chris@64
|
64
|
Chris@64
|
65 for o in outputs:
|
Chris@64
|
66 assert o in outIndices
|
Chris@64
|
67
|
Chris@66
|
68 if outputs == []:
|
Chris@67
|
69 outputs = [plugOuts[0]["identifier"]]
|
Chris@66
|
70
|
Chris@56
|
71 ff = framesFromArray(data, stepSize, blockSize)
|
Chris@61
|
72 fi = 0
|
Chris@64
|
73
|
Chris@71
|
74 #!!! should we fill in the correct timestamps here?
|
Chris@71
|
75
|
Chris@61
|
76 for f in ff:
|
Chris@74
|
77 results = plug.processBlock(f, vampyhost.frame2RealTime(fi, sampleRate))
|
Chris@64
|
78 # results is a dict mapping output number -> list of feature dicts
|
Chris@66
|
79 for o in outputs:
|
Chris@66
|
80 if outIndices[o] in results:
|
Chris@66
|
81 for r in results[outIndices[o]]:
|
Chris@70
|
82 yield { o: r }
|
Chris@61
|
83 fi = fi + stepSize
|
Chris@61
|
84
|
Chris@66
|
85 results = plug.getRemainingFeatures()
|
Chris@66
|
86 for o in outputs:
|
Chris@66
|
87 if outIndices[o] in results:
|
Chris@66
|
88 for r in results[outIndices[o]]:
|
Chris@70
|
89 yield { o: r }
|
Chris@66
|
90
|
Chris@66
|
91 plug.unload()
|
Chris@66
|
92
|
Chris@66
|
93
|
Chris@74
|
94 def selectFeaturesForOutput(output, features):
|
Chris@74
|
95 for ff in features:
|
Chris@74
|
96 if output in ff:
|
Chris@74
|
97 for f in ff[output]:
|
Chris@74
|
98 yield f
|
Chris@74
|
99
|
Chris@72
|
100 ##!!!
|
Chris@72
|
101 ##
|
Chris@72
|
102 ## We could also devise a generator for the timestamps that need
|
Chris@72
|
103 ## filling: provide the output type & rate and get back a timestamp
|
Chris@72
|
104 ## generator
|
Chris@72
|
105 ##
|
Chris@72
|
106 ##!!!
|
Chris@72
|
107
|
Chris@74
|
108 # def timestampFeatures(sampleRate, stepSize, outputDescriptor, features):
|
Chris@72
|
109
|
Chris@74
|
110 # n = 0
|
Chris@71
|
111
|
Chris@74
|
112 # if outputDict.sampleType == vampyhost.OneSamplePerStep:
|
Chris@74
|
113 # for True:
|
Chris@74
|
114 # yield vampyhost.frame2RealTime(n * stepSize, sampleRate)
|
Chris@74
|
115 # n = n + 1
|
Chris@74
|
116
|
Chris@74
|
117 # elif outputDict.sampleType == vampyhost.FixedSampleRate:
|
Chris@74
|
118 # for True:
|
Chris@74
|
119
|
Chris@74
|
120
|
Chris@74
|
121
|
Chris@74
|
122 def collect(data, sampleRate, key, parameters = {}, output = ""):
|
Chris@74
|
123
|
Chris@74
|
124 plug, stepSize, blockSize = loadAndConfigureFor(data, sampleRate, key, parameters)
|
Chris@71
|
125
|
Chris@71
|
126 plugOuts = plug.getOutputs()
|
Chris@71
|
127 if plugOuts == []:
|
Chris@71
|
128 return
|
Chris@71
|
129
|
Chris@71
|
130 outNo = -1
|
Chris@71
|
131 for n, o in zip(range(0, len(plugOuts)), plugOuts):
|
Chris@71
|
132 if output == "" or o["identifier"] == output:
|
Chris@71
|
133 outNo = n
|
Chris@71
|
134 break
|
Chris@71
|
135
|
Chris@71
|
136 assert outNo >= 0 #!!! todo proper error reporting
|
Chris@71
|
137
|
Chris@71
|
138 ff = framesFromArray(data, stepSize, blockSize)
|
Chris@71
|
139 fi = 0
|
Chris@71
|
140
|
Chris@71
|
141 #!!! todo!
|
Chris@71
|
142
|
Chris@71
|
143 plug.unload()
|
Chris@71
|
144
|
Chris@71
|
145 return {}
|
Chris@71
|
146
|
Chris@71
|
147
|