Chris@56
|
1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
|
Chris@56
|
2
|
Chris@56
|
3 import vampyhost
|
Chris@56
|
4 import numpy
|
Chris@56
|
5
|
Chris@56
|
6 def listPlugins():
|
Chris@56
|
7 return vampyhost.listPlugins()
|
Chris@56
|
8
|
Chris@56
|
9 def framesFromArray(arr, stepSize, frameSize):
|
Chris@56
|
10 """Generate a list of frames of size frameSize, extracted from the input array arr at stepSize intervals"""
|
Chris@56
|
11 # presumably such a function exists in many places, but I need practice
|
Chris@56
|
12 assert(stepSize > 0)
|
Chris@56
|
13 if arr.ndim == 1: # turn 1d into 2d array with 1 channel
|
Chris@56
|
14 arr = numpy.reshape(arr, (1, arr.shape[0]))
|
Chris@56
|
15 assert(arr.ndim == 2)
|
Chris@56
|
16 n = arr.shape[1]
|
Chris@56
|
17 i = 0
|
Chris@56
|
18 while (i < n):
|
Chris@56
|
19 frame = arr[:, i : i + frameSize]
|
Chris@56
|
20 w = frame.shape[1]
|
Chris@56
|
21 if (w < frameSize):
|
Chris@56
|
22 pad = numpy.zeros((frame.shape[0], frameSize - w))
|
Chris@56
|
23 frame = numpy.concatenate((frame, pad), 1)
|
Chris@56
|
24 yield frame
|
Chris@56
|
25 i = i + stepSize
|
Chris@56
|
26
|
Chris@65
|
27 def process(data, samplerate, key, parameters = {}, outputs = []):
|
Chris@65
|
28 #!!! docstring
|
Chris@58
|
29 plug = vampyhost.loadPlugin(key, samplerate,
|
Chris@58
|
30 vampyhost.AdaptInputDomain +
|
Chris@58
|
31 vampyhost.AdaptChannelCount)
|
Chris@64
|
32
|
Chris@64
|
33 plugOuts = plug.getOutputs()
|
Chris@64
|
34
|
Chris@64
|
35 outIndices = dict(zip([o["identifier"] for o in plugOuts],
|
Chris@64
|
36 range(0, len(plugOuts)))) # id -> n
|
Chris@64
|
37
|
Chris@64
|
38 for o in outputs:
|
Chris@64
|
39 assert o in outIndices
|
Chris@64
|
40
|
Chris@66
|
41 if outputs == []:
|
Chris@67
|
42 outputs = [plugOuts[0]["identifier"]]
|
Chris@66
|
43
|
Chris@66
|
44 singleOutput = (len(outputs) == 1)
|
Chris@66
|
45
|
Chris@56
|
46 stepSize = plug.getPreferredStepSize()
|
Chris@56
|
47 blockSize = plug.getPreferredBlockSize()
|
Chris@61
|
48 if blockSize == 0:
|
Chris@61
|
49 blockSize = 1024
|
Chris@61
|
50 if stepSize == 0:
|
Chris@61
|
51 stepSize = blockSize ##!!! or blockSize/2, but check this with input domain adapter
|
Chris@61
|
52 channels = 1
|
Chris@61
|
53 if data.ndim > 1:
|
Chris@61
|
54 channels = data.shape[0]
|
Chris@64
|
55
|
Chris@61
|
56 plug.initialise(channels, stepSize, blockSize)
|
Chris@56
|
57 ff = framesFromArray(data, stepSize, blockSize)
|
Chris@61
|
58 fi = 0
|
Chris@64
|
59
|
Chris@61
|
60 for f in ff:
|
Chris@64
|
61 results = plug.processBlock(f, vampyhost.frame2RealTime(fi, samplerate))
|
Chris@64
|
62 # results is a dict mapping output number -> list of feature dicts
|
Chris@66
|
63 for o in outputs:
|
Chris@66
|
64 if outIndices[o] in results:
|
Chris@66
|
65 for r in results[outIndices[o]]:
|
Chris@66
|
66 if singleOutput:
|
Chris@66
|
67 yield r
|
Chris@66
|
68 else:
|
Chris@64
|
69 yield { o: r }
|
Chris@61
|
70 fi = fi + stepSize
|
Chris@61
|
71
|
Chris@66
|
72 results = plug.getRemainingFeatures()
|
Chris@66
|
73 for o in outputs:
|
Chris@66
|
74 if outIndices[o] in results:
|
Chris@66
|
75 for r in results[outIndices[o]]:
|
Chris@66
|
76 if singleOutput:
|
Chris@66
|
77 yield r
|
Chris@66
|
78 else:
|
Chris@66
|
79 yield { o: r }
|
Chris@66
|
80
|
Chris@66
|
81 plug.unload()
|
Chris@66
|
82
|
Chris@66
|
83
|