Chris@56
|
1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
|
Chris@56
|
2
|
Chris@56
|
3 import vampyhost
|
Chris@75
|
4 import load
|
Chris@88
|
5 import process
|
Chris@89
|
6 import frames
|
Chris@89
|
7
|
Chris@93
|
8 import numpy as np
|
Chris@93
|
9
|
Chris@72
|
10
|
Chris@85
|
11 def timestamp_features(sample_rate, step_size, output_desc, features):
|
Chris@85
|
12 n = -1
|
Chris@85
|
13 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
|
Chris@85
|
14 for f in features:
|
Chris@85
|
15 n = n + 1
|
Chris@85
|
16 t = vampyhost.frame_to_realtime(n * step_size, sample_rate)
|
Chris@85
|
17 f["timestamp"] = t
|
Chris@85
|
18 yield f
|
Chris@85
|
19 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
|
Chris@85
|
20 output_rate = output_desc["sample_rate"]
|
Chris@85
|
21 for f in features:
|
Chris@85
|
22 if "has_timestamp" in f:
|
Chris@85
|
23 n = int(f["timestamp"].to_float() * output_rate + 0.5)
|
Chris@85
|
24 else:
|
Chris@85
|
25 n = n + 1
|
Chris@85
|
26 f["timestamp"] = vampyhost.RealTime('seconds', float(n) / output_rate)
|
Chris@85
|
27 yield f
|
Chris@85
|
28 else:
|
Chris@85
|
29 for f in features:
|
Chris@85
|
30 yield f
|
Chris@72
|
31
|
Chris@74
|
32
|
Chris@93
|
33 def fill_timestamps(results, sample_rate, step_size, output_desc):
|
Chris@93
|
34
|
Chris@93
|
35 output = output_desc["identifier"]
|
Chris@93
|
36
|
Chris@93
|
37 selected = [ r[output] for r in results ]
|
Chris@93
|
38
|
Chris@93
|
39 stamped = timestamp_features(sample_rate, step_size, output_desc, selected)
|
Chris@93
|
40
|
Chris@93
|
41 for s in stamped:
|
Chris@93
|
42 yield s
|
Chris@93
|
43
|
Chris@93
|
44
|
Chris@93
|
45 def deduce_shape(output_desc):
|
Chris@93
|
46 if output_desc["has_duration"]:
|
Chris@93
|
47 return "individual"
|
Chris@93
|
48 if output_desc["sample_type"] == vampyhost.VARIABLE_SAMPLE_RATE:
|
Chris@93
|
49 return "individual"
|
Chris@93
|
50 if not output_desc["has_fixed_bin_count"]:
|
Chris@93
|
51 return "individual"
|
Chris@93
|
52 if output_desc["bin_count"] == 0:
|
Chris@93
|
53 return "individual"
|
Chris@93
|
54 if output_desc["bin_count"] > 1:
|
Chris@93
|
55 return "matrix"
|
Chris@93
|
56 return "vector"
|
Chris@93
|
57
|
Chris@93
|
58
|
Chris@93
|
59 def process_and_reshape(data, sample_rate, key, output, parameters = {}):
|
Chris@88
|
60
|
Chris@89
|
61 plugin, step_size, block_size = load.load_and_configure(data, sample_rate, key, parameters)
|
Chris@71
|
62
|
Chris@88
|
63 if output == "":
|
Chris@89
|
64 output_desc = plugin.get_output(0)
|
Chris@89
|
65 output = output_desc["identifier"]
|
Chris@88
|
66 else:
|
Chris@89
|
67 output_desc = plugin.get_output(output)
|
Chris@71
|
68
|
Chris@89
|
69 ff = frames.frames_from_array(data, step_size, block_size)
|
Chris@89
|
70
|
Chris@89
|
71 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
|
Chris@89
|
72
|
Chris@93
|
73 shape = deduce_shape(output_desc)
|
Chris@89
|
74
|
Chris@93
|
75 if shape == "vector":
|
Chris@93
|
76 rv = np.array([r[output]["values"][0] for r in results])
|
Chris@93
|
77 elif shape == "matrix":
|
Chris@93
|
78 rv = np.array(
|
Chris@93
|
79 [[r[output]["values"][i] for r in results]
|
Chris@93
|
80 for i in range(0, output_desc["bin_count"]-1)])
|
Chris@93
|
81 else:
|
Chris@93
|
82 rv = list(fill_timestamps(results, sample_rate, step_size, output_desc))
|
Chris@89
|
83
|
Chris@89
|
84 plugin.unload()
|
Chris@93
|
85 return rv
|
Chris@93
|
86
|
Chris@71
|
87
|
Chris@89
|
88 def collect(data, sample_rate, key, output, parameters = {}):
|
Chris@93
|
89 return process_and_reshape(data, sample_rate, key, output, parameters)
|