annotate vamp/collect.py @ 97:06c4afba4fc5

Fix matrix return
author Chris Cannam
date Tue, 03 Feb 2015 10:29:21 +0000
parents f0e005248b9a
children 7764eb74a3c6
rev   line source
Chris@56 1 '''A high-level interface to the vampyhost extension module, for quickly and easily running Vamp audio analysis plugins on audio files and buffers.'''
Chris@56 2
Chris@56 3 import vampyhost
Chris@75 4 import load
Chris@88 5 import process
Chris@89 6 import frames
Chris@89 7
Chris@93 8 import numpy as np
Chris@93 9
Chris@94 10 def get_feature_step_time(sample_rate, step_size, output_desc):
Chris@94 11 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
Chris@94 12 return vampyhost.frame_to_realtime(step_size, sample_rate)
Chris@94 13 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
Chris@94 14 return vampyhost.RealTime('seconds', 1.0 / output_desc["sample_rate"])
Chris@94 15 else:
Chris@94 16 return 1
Chris@72 17
Chris@85 18 def timestamp_features(sample_rate, step_size, output_desc, features):
Chris@85 19 n = -1
Chris@85 20 if output_desc["sample_type"] == vampyhost.ONE_SAMPLE_PER_STEP:
Chris@85 21 for f in features:
Chris@85 22 n = n + 1
Chris@85 23 t = vampyhost.frame_to_realtime(n * step_size, sample_rate)
Chris@85 24 f["timestamp"] = t
Chris@85 25 yield f
Chris@85 26 elif output_desc["sample_type"] == vampyhost.FIXED_SAMPLE_RATE:
Chris@85 27 output_rate = output_desc["sample_rate"]
Chris@85 28 for f in features:
Chris@85 29 if "has_timestamp" in f:
Chris@85 30 n = int(f["timestamp"].to_float() * output_rate + 0.5)
Chris@85 31 else:
Chris@85 32 n = n + 1
Chris@85 33 f["timestamp"] = vampyhost.RealTime('seconds', float(n) / output_rate)
Chris@85 34 yield f
Chris@85 35 else:
Chris@85 36 for f in features:
Chris@85 37 yield f
Chris@72 38
Chris@93 39 def fill_timestamps(results, sample_rate, step_size, output_desc):
Chris@93 40
Chris@93 41 output = output_desc["identifier"]
Chris@93 42
Chris@93 43 selected = [ r[output] for r in results ]
Chris@93 44
Chris@93 45 stamped = timestamp_features(sample_rate, step_size, output_desc, selected)
Chris@93 46
Chris@93 47 for s in stamped:
Chris@93 48 yield s
Chris@93 49
Chris@93 50 def deduce_shape(output_desc):
Chris@93 51 if output_desc["has_duration"]:
Chris@93 52 return "individual"
Chris@93 53 if output_desc["sample_type"] == vampyhost.VARIABLE_SAMPLE_RATE:
Chris@93 54 return "individual"
Chris@93 55 if not output_desc["has_fixed_bin_count"]:
Chris@93 56 return "individual"
Chris@93 57 if output_desc["bin_count"] == 0:
Chris@93 58 return "individual"
Chris@94 59 if output_desc["bin_count"] == 1:
Chris@94 60 return "vector"
Chris@94 61 return "matrix"
Chris@93 62
Chris@88 63
Chris@95 64 def reshape(results, sample_rate, step_size, output_desc):
Chris@71 65
Chris@95 66 output = output_desc["identifier"]
Chris@93 67 shape = deduce_shape(output_desc)
Chris@94 68 out_step = get_feature_step_time(sample_rate, step_size, output_desc)
Chris@89 69
Chris@93 70 if shape == "vector":
Chris@94 71 rv = ( out_step,
Chris@96 72 np.array([r[output]["values"][0] for r in results], np.float32) )
Chris@93 73 elif shape == "matrix":
Chris@97 74 #!!! todo: check that each feature has the right number of bins?
Chris@97 75 outseq = [r[output]["values"] for r in results]
Chris@97 76 rv = ( out_step, np.array(outseq, np.float32) )
Chris@93 77 else:
Chris@93 78 rv = list(fill_timestamps(results, sample_rate, step_size, output_desc))
Chris@89 79
Chris@95 80 return rv
Chris@95 81
Chris@95 82
Chris@96 83 def collect(data, sample_rate, key, output = "", parameters = {}):
Chris@95 84
Chris@95 85 plugin, step_size, block_size = load.load_and_configure(data, sample_rate, key, parameters)
Chris@95 86
Chris@95 87 if output == "":
Chris@95 88 output_desc = plugin.get_output(0)
Chris@95 89 output = output_desc["identifier"]
Chris@95 90 else:
Chris@95 91 output_desc = plugin.get_output(output)
Chris@95 92
Chris@95 93 ff = frames.frames_from_array(data, step_size, block_size)
Chris@95 94
Chris@95 95 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
Chris@95 96
Chris@95 97 rv = reshape(results, sample_rate, step_size, output_desc)
Chris@95 98
Chris@89 99 plugin.unload()
Chris@93 100 return rv
Chris@93 101
Chris@95 102
Chris@96 103 def collect_frames(ff, channels, sample_rate, step_size, key, output = "", parameters = {}):
Chris@71 104
Chris@95 105 plug = vampyhost.load_plugin(key, sample_rate,
Chris@95 106 vampyhost.ADAPT_INPUT_DOMAIN +
Chris@95 107 vampyhost.ADAPT_BUFFER_SIZE +
Chris@95 108 vampyhost.ADAPT_CHANNEL_COUNT)
Chris@95 109
Chris@95 110 plug.set_parameter_values(parameters)
Chris@95 111
Chris@95 112 if not plug.initialise(channels, step_size, block_size):
Chris@95 113 raise "Failed to initialise plugin"
Chris@95 114
Chris@95 115 if output == "":
Chris@95 116 output_desc = plugin.get_output(0)
Chris@95 117 output = output_desc["identifier"]
Chris@95 118 else:
Chris@95 119 output_desc = plugin.get_output(output)
Chris@95 120
Chris@95 121 results = process.process_frames_with_plugin(ff, sample_rate, step_size, plugin, [output])
Chris@95 122
Chris@95 123 rv = reshape(results, sample_rate, step_size, output_desc)
Chris@95 124
Chris@95 125 plugin.unload()
Chris@95 126 return rv
Chris@95 127