Chris@56
|
1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
|
Chris@56
|
2
|
Chris@56
|
3 import vampyhost
|
Chris@75
|
4 import load
|
Chris@88
|
5 import process
|
Chris@89
|
6 import frames
|
Chris@89
|
7
|
Chris@93
|
8 import numpy as np
|
Chris@93
|
9
|
Chris@94
|
10 def get_feature_step_time(sample_rate, step_size, output_desc):
|
Chris@94
|
11 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
|
Chris@94
|
12 return vampyhost.frame_to_realtime(step_size, sample_rate)
|
Chris@94
|
13 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
|
Chris@94
|
14 return vampyhost.RealTime('seconds', 1.0 / output_desc["sample_rate"])
|
Chris@94
|
15 else:
|
Chris@94
|
16 return 1
|
Chris@72
|
17
|
Chris@85
|
18 def timestamp_features(sample_rate, step_size, output_desc, features):
|
Chris@85
|
19 n = -1
|
Chris@85
|
20 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
|
Chris@85
|
21 for f in features:
|
Chris@85
|
22 n = n + 1
|
Chris@85
|
23 t = vampyhost.frame_to_realtime(n * step_size, sample_rate)
|
Chris@85
|
24 f["timestamp"] = t
|
Chris@85
|
25 yield f
|
Chris@85
|
26 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
|
Chris@85
|
27 output_rate = output_desc["sample_rate"]
|
Chris@85
|
28 for f in features:
|
Chris@85
|
29 if "has_timestamp" in f:
|
Chris@85
|
30 n = int(f["timestamp"].to_float() * output_rate + 0.5)
|
Chris@85
|
31 else:
|
Chris@85
|
32 n = n + 1
|
Chris@85
|
33 f["timestamp"] = vampyhost.RealTime('seconds', float(n) / output_rate)
|
Chris@85
|
34 yield f
|
Chris@85
|
35 else:
|
Chris@85
|
36 for f in features:
|
Chris@85
|
37 yield f
|
Chris@72
|
38
|
Chris@93
|
39 def fill_timestamps(results, sample_rate, step_size, output_desc):
|
Chris@93
|
40
|
Chris@93
|
41 output = output_desc["identifier"]
|
Chris@93
|
42
|
Chris@93
|
43 selected = [ r[output] for r in results ]
|
Chris@93
|
44
|
Chris@93
|
45 stamped = timestamp_features(sample_rate, step_size, output_desc, selected)
|
Chris@93
|
46
|
Chris@93
|
47 for s in stamped:
|
Chris@93
|
48 yield s
|
Chris@93
|
49
|
Chris@93
|
50 def deduce_shape(output_desc):
|
Chris@93
|
51 if output_desc["has_duration"]:
|
Chris@93
|
52 return "individual"
|
Chris@93
|
53 if output_desc["sample_type"] == vampyhost.VARIABLE_SAMPLE_RATE:
|
Chris@93
|
54 return "individual"
|
Chris@93
|
55 if not output_desc["has_fixed_bin_count"]:
|
Chris@93
|
56 return "individual"
|
Chris@93
|
57 if output_desc["bin_count"] == 0:
|
Chris@93
|
58 return "individual"
|
Chris@94
|
59 if output_desc["bin_count"] == 1:
|
Chris@94
|
60 return "vector"
|
Chris@94
|
61 return "matrix"
|
Chris@93
|
62
|
Chris@88
|
63
|
Chris@95
|
64 def reshape(results, sample_rate, step_size, output_desc):
|
Chris@71
|
65
|
Chris@95
|
66 output = output_desc["identifier"]
|
Chris@93
|
67 shape = deduce_shape(output_desc)
|
Chris@94
|
68 out_step = get_feature_step_time(sample_rate, step_size, output_desc)
|
Chris@89
|
69
|
Chris@93
|
70 if shape == "vector":
|
Chris@94
|
71 rv = ( out_step,
|
Chris@96
|
72 np.array([r[output]["values"][0] for r in results], np.float32) )
|
Chris@93
|
73 elif shape == "matrix":
|
Chris@97
|
74 #!!! todo: check that each feature has the right number of bins?
|
Chris@97
|
75 outseq = [r[output]["values"] for r in results]
|
Chris@97
|
76 rv = ( out_step, np.array(outseq, np.float32) )
|
Chris@93
|
77 else:
|
Chris@93
|
78 rv = list(fill_timestamps(results, sample_rate, step_size, output_desc))
|
Chris@89
|
79
|
Chris@95
|
80 return rv
|
Chris@95
|
81
|
Chris@95
|
82
|
Chris@96
|
83 def collect(data, sample_rate, key, output = "", parameters = {}):
|
Chris@95
|
84
|
Chris@95
|
85 plugin, step_size, block_size = load.load_and_configure(data, sample_rate, key, parameters)
|
Chris@95
|
86
|
Chris@95
|
87 if output == "":
|
Chris@95
|
88 output_desc = plugin.get_output(0)
|
Chris@95
|
89 output = output_desc["identifier"]
|
Chris@95
|
90 else:
|
Chris@95
|
91 output_desc = plugin.get_output(output)
|
Chris@95
|
92
|
Chris@95
|
93 ff = frames.frames_from_array(data, step_size, block_size)
|
Chris@95
|
94
|
Chris@95
|
95 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
|
Chris@95
|
96
|
Chris@95
|
97 rv = reshape(results, sample_rate, step_size, output_desc)
|
Chris@95
|
98
|
Chris@89
|
99 plugin.unload()
|
Chris@93
|
100 return rv
|
Chris@93
|
101
|
Chris@95
|
102
|
Chris@96
|
103 def collect_frames(ff, channels, sample_rate, step_size, key, output = "", parameters = {}):
|
Chris@71
|
104
|
Chris@95
|
105 plug = vampyhost.load_plugin(key, sample_rate,
|
Chris@95
|
106 vampyhost.ADAPT_INPUT_DOMAIN +
|
Chris@95
|
107 vampyhost.ADAPT_BUFFER_SIZE +
|
Chris@95
|
108 vampyhost.ADAPT_CHANNEL_COUNT)
|
Chris@95
|
109
|
Chris@95
|
110 plug.set_parameter_values(parameters)
|
Chris@95
|
111
|
Chris@95
|
112 if not plug.initialise(channels, step_size, block_size):
|
Chris@95
|
113 raise "Failed to initialise plugin"
|
Chris@95
|
114
|
Chris@95
|
115 if output == "":
|
Chris@95
|
116 output_desc = plugin.get_output(0)
|
Chris@95
|
117 output = output_desc["identifier"]
|
Chris@95
|
118 else:
|
Chris@95
|
119 output_desc = plugin.get_output(output)
|
Chris@95
|
120
|
Chris@95
|
121 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
|
Chris@95
|
122
|
Chris@95
|
123 rv = reshape(results, sample_rate, step_size, output_desc)
|
Chris@95
|
124
|
Chris@95
|
125 plugin.unload()
|
Chris@95
|
126 return rv
|
Chris@95
|
127
|