annotate vamp/collect.py @ 96:f0e005248b9a

More collect tweaks -- currently failing tests
author Chris Cannam
date Mon, 02 Feb 2015 17:31:30 +0000
parents 3e5791890b65
children 06c4afba4fc5
rev   line source
Chris@56 1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
Chris@56 2
Chris@56 3 import vampyhost
Chris@75 4 import load
Chris@88 5 import process
Chris@89 6 import frames
Chris@89 7
Chris@93 8 import numpy as np
Chris@93 9
Chris@94 10 def get_feature_step_time(sample_rate, step_size, output_desc):
Chris@94 11 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
Chris@94 12 return vampyhost.frame_to_realtime(step_size, sample_rate)
Chris@94 13 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
Chris@94 14 return vampyhost.RealTime('seconds', 1.0 / output_desc["sample_rate"])
Chris@94 15 else:
Chris@94 16 return 1
Chris@72 17
Chris@85 18 def timestamp_features(sample_rate, step_size, output_desc, features):
Chris@85 19 n = -1
Chris@85 20 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
Chris@85 21 for f in features:
Chris@85 22 n = n + 1
Chris@85 23 t = vampyhost.frame_to_realtime(n * step_size, sample_rate)
Chris@85 24 f["timestamp"] = t
Chris@85 25 yield f
Chris@85 26 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
Chris@85 27 output_rate = output_desc["sample_rate"]
Chris@85 28 for f in features:
Chris@85 29 if "has_timestamp" in f:
Chris@85 30 n = int(f["timestamp"].to_float() * output_rate + 0.5)
Chris@85 31 else:
Chris@85 32 n = n + 1
Chris@85 33 f["timestamp"] = vampyhost.RealTime('seconds', float(n) / output_rate)
Chris@85 34 yield f
Chris@85 35 else:
Chris@85 36 for f in features:
Chris@85 37 yield f
Chris@72 38
Chris@93 39 def fill_timestamps(results, sample_rate, step_size, output_desc):
Chris@93 40
Chris@93 41 output = output_desc["identifier"]
Chris@93 42
Chris@93 43 selected = [ r[output] for r in results ]
Chris@93 44
Chris@93 45 stamped = timestamp_features(sample_rate, step_size, output_desc, selected)
Chris@93 46
Chris@93 47 for s in stamped:
Chris@93 48 yield s
Chris@93 49
Chris@93 50 def deduce_shape(output_desc):
Chris@93 51 if output_desc["has_duration"]:
Chris@93 52 return "individual"
Chris@93 53 if output_desc["sample_type"] == vampyhost.VARIABLE_SAMPLE_RATE:
Chris@93 54 return "individual"
Chris@93 55 if not output_desc["has_fixed_bin_count"]:
Chris@93 56 return "individual"
Chris@93 57 if output_desc["bin_count"] == 0:
Chris@93 58 return "individual"
Chris@94 59 if output_desc["bin_count"] == 1:
Chris@94 60 return "vector"
Chris@94 61 return "matrix"
Chris@93 62
Chris@88 63
Chris@95 64 def reshape(results, sample_rate, step_size, output_desc):
Chris@71 65
Chris@95 66 output = output_desc["identifier"]
Chris@93 67 shape = deduce_shape(output_desc)
Chris@94 68 out_step = get_feature_step_time(sample_rate, step_size, output_desc)
Chris@89 69
Chris@93 70 if shape == "vector":
Chris@94 71 rv = ( out_step,
Chris@96 72 np.array([r[output]["values"][0] for r in results], np.float32) )
Chris@93 73 elif shape == "matrix":
Chris@94 74 rv = ( out_step,
Chris@94 75 np.array(
Chris@94 76 [[r[output]["values"][i] for r in results]
Chris@96 77 for i in range(0, output_desc["bin_count"])], np.float32) )
Chris@93 78 else:
Chris@93 79 rv = list(fill_timestamps(results, sample_rate, step_size, output_desc))
Chris@89 80
Chris@95 81 return rv
Chris@95 82
Chris@95 83
Chris@96 84 def collect(data, sample_rate, key, output = "", parameters = {}):
Chris@95 85
Chris@95 86 plugin, step_size, block_size = load.load_and_configure(data, sample_rate, key, parameters)
Chris@95 87
Chris@95 88 if output == "":
Chris@95 89 output_desc = plugin.get_output(0)
Chris@95 90 output = output_desc["identifier"]
Chris@95 91 else:
Chris@95 92 output_desc = plugin.get_output(output)
Chris@95 93
Chris@95 94 ff = frames.frames_from_array(data, step_size, block_size)
Chris@95 95
Chris@95 96 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
Chris@95 97
Chris@95 98 rv = reshape(results, sample_rate, step_size, output_desc)
Chris@95 99
Chris@89 100 plugin.unload()
Chris@93 101 return rv
Chris@93 102
Chris@95 103
Chris@96 104 def collect_frames(ff, channels, sample_rate, step_size, key, output = "", parameters = {}):
Chris@71 105
Chris@95 106 plug = vampyhost.load_plugin(key, sample_rate,
Chris@95 107 vampyhost.ADAPT_INPUT_DOMAIN +
Chris@95 108 vampyhost.ADAPT_BUFFER_SIZE +
Chris@95 109 vampyhost.ADAPT_CHANNEL_COUNT)
Chris@95 110
Chris@95 111 plug.set_parameter_values(parameters)
Chris@95 112
Chris@95 113 if not plug.initialise(channels, step_size, block_size):
Chris@95 114 raise "Failed to initialise plugin"
Chris@95 115
Chris@95 116 if output == "":
Chris@95 117 output_desc = plugin.get_output(0)
Chris@95 118 output = output_desc["identifier"]
Chris@95 119 else:
Chris@95 120 output_desc = plugin.get_output(output)
Chris@95 121
Chris@95 122 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
Chris@95 123
Chris@95 124 rv = reshape(results, sample_rate, step_size, output_desc)
Chris@95 125
Chris@95 126 plugin.unload()
Chris@95 127 return rv
Chris@95 128