Chris@56
|
1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
|
Chris@56
|
2
|
Chris@68
|
3 ###!!! todo: move all the real code out of __init__.py
|
Chris@68
|
4
|
Chris@56
|
5 import vampyhost
|
Chris@56
|
6 import numpy
|
Chris@56
|
7
|
Chris@56
|
8 def listPlugins():
|
Chris@56
|
9 return vampyhost.listPlugins()
|
Chris@56
|
10
|
Chris@56
|
11 def framesFromArray(arr, stepSize, frameSize):
|
Chris@56
|
12 """Generate a list of frames of size frameSize, extracted from the input array arr at stepSize intervals"""
|
Chris@56
|
13 # presumably such a function exists in many places, but I need practice
|
Chris@56
|
14 assert(stepSize > 0)
|
Chris@56
|
15 if arr.ndim == 1: # turn 1d into 2d array with 1 channel
|
Chris@56
|
16 arr = numpy.reshape(arr, (1, arr.shape[0]))
|
Chris@56
|
17 assert(arr.ndim == 2)
|
Chris@56
|
18 n = arr.shape[1]
|
Chris@56
|
19 i = 0
|
Chris@56
|
20 while (i < n):
|
Chris@56
|
21 frame = arr[:, i : i + frameSize]
|
Chris@56
|
22 w = frame.shape[1]
|
Chris@56
|
23 if (w < frameSize):
|
Chris@56
|
24 pad = numpy.zeros((frame.shape[0], frameSize - w))
|
Chris@56
|
25 frame = numpy.concatenate((frame, pad), 1)
|
Chris@56
|
26 yield frame
|
Chris@56
|
27 i = i + stepSize
|
Chris@56
|
28
|
Chris@68
|
29
|
Chris@68
|
30 def loadAndConfigureFor(data, samplerate, key, parameters):
|
Chris@58
|
31 plug = vampyhost.loadPlugin(key, samplerate,
|
Chris@58
|
32 vampyhost.AdaptInputDomain +
|
Chris@58
|
33 vampyhost.AdaptChannelCount)
|
Chris@64
|
34
|
Chris@68
|
35 plug.setParameterValues(parameters)
|
Chris@68
|
36
|
Chris@68
|
37 stepSize = plug.getPreferredStepSize()
|
Chris@68
|
38 blockSize = plug.getPreferredBlockSize()
|
Chris@68
|
39
|
Chris@68
|
40 if blockSize == 0:
|
Chris@68
|
41 blockSize = 1024
|
Chris@68
|
42 if stepSize == 0:
|
Chris@68
|
43 stepSize = blockSize ##!!! or blockSize/2, but check this with input domain adapter
|
Chris@68
|
44
|
Chris@68
|
45 channels = 1
|
Chris@68
|
46 if data.ndim > 1:
|
Chris@68
|
47 channels = data.shape[0]
|
Chris@68
|
48
|
Chris@68
|
49 plug.initialise(channels, stepSize, blockSize)
|
Chris@68
|
50 return (plug, stepSize, blockSize)
|
Chris@68
|
51
|
Chris@68
|
52
|
Chris@68
|
53 def process(data, samplerate, key, parameters = {}, outputs = []):
|
Chris@68
|
54 #!!! docstring
|
Chris@68
|
55
|
Chris@68
|
56 plug, stepSize, blockSize = loadAndConfigureFor(data, samplerate, key, parameters)
|
Chris@68
|
57
|
Chris@64
|
58 plugOuts = plug.getOutputs()
|
Chris@68
|
59 if plugOuts == []:
|
Chris@68
|
60 return
|
Chris@64
|
61
|
Chris@64
|
62 outIndices = dict(zip([o["identifier"] for o in plugOuts],
|
Chris@64
|
63 range(0, len(plugOuts)))) # id -> n
|
Chris@64
|
64
|
Chris@64
|
65 for o in outputs:
|
Chris@64
|
66 assert o in outIndices
|
Chris@64
|
67
|
Chris@66
|
68 if outputs == []:
|
Chris@67
|
69 outputs = [plugOuts[0]["identifier"]]
|
Chris@66
|
70
|
Chris@56
|
71 ff = framesFromArray(data, stepSize, blockSize)
|
Chris@61
|
72 fi = 0
|
Chris@64
|
73
|
Chris@71
|
74 #!!! should we fill in the correct timestamps here?
|
Chris@71
|
75
|
Chris@61
|
76 for f in ff:
|
Chris@64
|
77 results = plug.processBlock(f, vampyhost.frame2RealTime(fi, samplerate))
|
Chris@64
|
78 # results is a dict mapping output number -> list of feature dicts
|
Chris@71
|
79 print("results = " + str(results))
|
Chris@66
|
80 for o in outputs:
|
Chris@66
|
81 if outIndices[o] in results:
|
Chris@66
|
82 for r in results[outIndices[o]]:
|
Chris@70
|
83 yield { o: r }
|
Chris@61
|
84 fi = fi + stepSize
|
Chris@61
|
85
|
Chris@66
|
86 results = plug.getRemainingFeatures()
|
Chris@66
|
87 for o in outputs:
|
Chris@66
|
88 if outIndices[o] in results:
|
Chris@66
|
89 for r in results[outIndices[o]]:
|
Chris@70
|
90 yield { o: r }
|
Chris@66
|
91
|
Chris@66
|
92 plug.unload()
|
Chris@66
|
93
|
Chris@66
|
94
|
Chris@72
|
95 ##!!!
|
Chris@72
|
96 ##
|
Chris@72
|
97 ## We could also devise a generator for the timestamps that need
|
Chris@72
|
98 ## filling: provide the output type & rate and get back a timestamp
|
Chris@72
|
99 ## generator
|
Chris@72
|
100 ##
|
Chris@72
|
101 ##!!!
|
Chris@72
|
102
|
Chris@72
|
103
|
Chris@71
|
104 def collect(data, samplerate, key, parameters = {}, output = ""):
|
Chris@71
|
105
|
Chris@71
|
106 plug, stepSize, blockSize = loadAndConfigureFor(data, samplerate, key, parameters)
|
Chris@71
|
107
|
Chris@71
|
108 plugOuts = plug.getOutputs()
|
Chris@71
|
109 if plugOuts == []:
|
Chris@71
|
110 return
|
Chris@71
|
111
|
Chris@71
|
112 outNo = -1
|
Chris@71
|
113 for n, o in zip(range(0, len(plugOuts)), plugOuts):
|
Chris@71
|
114 if output == "" or o["identifier"] == output:
|
Chris@71
|
115 outNo = n
|
Chris@71
|
116 break
|
Chris@71
|
117
|
Chris@71
|
118 assert outNo >= 0 #!!! todo proper error reporting
|
Chris@71
|
119
|
Chris@71
|
120 ff = framesFromArray(data, stepSize, blockSize)
|
Chris@71
|
121 fi = 0
|
Chris@71
|
122
|
Chris@71
|
123 #!!! todo!
|
Chris@71
|
124
|
Chris@71
|
125 plug.unload()
|
Chris@71
|
126
|
Chris@71
|
127 return {}
|
Chris@71
|
128
|
Chris@71
|
129
|