Mercurial > hg > vampy
comparison Example VamPy plugins/test/PyMFCC_legacy.py @ 52:d56f48aafb99
Updated some example plugins.
author | fazekasgy |
---|---|
date | Thu, 08 Oct 2009 08:59:08 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
51:c1e4f706ca9a | 52:d56f48aafb99 |
---|---|
1 '''PyMFCC_legacy.py - This example Vampy plugin demonstrates | |
2 how to return sprectrogram-like features. | |
3 | |
4 This plugin has frequency domain input and is using | |
5 the legacy input interface: the frequency samples are | |
6 passed as python list of complex numbers. | |
7 | |
8 Note: This is not the adviced way of writing plugins, | |
9 since the input interfaces provided for Numpy such as the | |
10 Numpy Array interface (flag: vf_ARRAY) are much faster. | |
11 | |
12 This plugin is using Numpy, but it does not rely on Vampy's | |
13 capability of passing Numpy arrays to the process directly. | |
14 However, it returns Numpy arrays from the process. | |
15 | |
16 Centre for Digital Music, Queen Mary University of London. | |
17 Copyright 2006 Gyorgy Fazekas, QMUL. | |
18 (See Vamp API for licence information.) | |
19 | |
20 Constants for Mel frequency conversion and filter | |
21 centre calculation are taken from the GNU GPL licenced | |
22 Freespeech library. Copyright (C) 1999 Jean-Marc Valin | |
23 ''' | |
24 | |
25 import sys,numpy | |
26 from numpy import log,exp,floor,sum | |
27 from numpy import * | |
28 from numpy.fft import * | |
29 import vampy | |
30 from vampy import * | |
31 | |
32 | |
33 class melScaling(object): | |
34 | |
35 def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): | |
36 '''Initialise frequency warping and DCT matrix. | |
37 Parameters: | |
38 sampleRate: audio sample rate | |
39 inputSize: length of magnitude spectrum (half of FFT size assumed) | |
40 numBands: number of mel Bands (MFCCs) | |
41 minHz: lower bound of warping (default = DC) | |
42 maxHz: higher bound of warping (default = Nyquist frequency) | |
43 ''' | |
44 self.sampleRate = sampleRate | |
45 self.NqHz = sampleRate / 2.0 | |
46 self.minHz = minHz | |
47 if maxHz is None : maxHz = self.NqHz | |
48 self.maxHz = maxHz | |
49 self.inputSize = inputSize | |
50 self.numBands = numBands | |
51 self.valid = False | |
52 self.updated = False | |
53 | |
54 | |
55 def update(self): | |
56 # make sure this will run only once if called from a vamp process | |
57 | |
58 if self.updated: return self.valid | |
59 self.updated = True | |
60 self.valid = False | |
61 print 'Updating parameters and recalculating filters: ' | |
62 print 'Nyquist: ',self.NqHz | |
63 | |
64 if self.maxHz > self.NqHz : | |
65 raise Exception('Maximum frequency must be smaller than the Nyquist frequency') | |
66 | |
67 self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) | |
68 self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) | |
69 print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) | |
70 self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) | |
71 self.DCTMatrix = self.getDCTMatrix(self.numBands) | |
72 self.filterIter = self.filterMatrix.__iter__() | |
73 self.valid = True | |
74 return self.valid | |
75 | |
76 # try : | |
77 # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) | |
78 # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) | |
79 # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) | |
80 # self.DCTMatrix = self.getDCTMatrix(self.numBands) | |
81 # self.filterIter = self.filterMatrix.__iter__() | |
82 # self.valid = True | |
83 # return True | |
84 # except : | |
85 # print "Invalid parameter setting encountered in MelScaling class." | |
86 # return False | |
87 # return True | |
88 | |
89 def getFilterCentres(self,inputSize,numBands): | |
90 '''Calculate Mel filter centres around FFT bins. | |
91 This function calculates two extra bands at the edges for | |
92 finding the starting and end point of the first and last | |
93 actual filters.''' | |
94 centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel | |
95 centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) | |
96 return numpy.array(centresBin,int) | |
97 | |
98 def getFilterMatrix(self,inputSize,numBands): | |
99 '''Compose the Mel scaling matrix.''' | |
100 filterMatrix = numpy.zeros((numBands,inputSize)) | |
101 self.filterCentres = self.getFilterCentres(inputSize,numBands) | |
102 for i in xrange(numBands) : | |
103 start,centre,end = self.filterCentres[i:i+3] | |
104 self.setFilter(filterMatrix[i],start,centre,end) | |
105 return filterMatrix.transpose() | |
106 | |
107 def setFilter(self,filt,filterStart,filterCentre,filterEnd): | |
108 '''Calculate a single Mel filter.''' | |
109 k1 = numpy.float32(filterCentre-filterStart) | |
110 k2 = numpy.float32(filterEnd-filterCentre) | |
111 up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 | |
112 dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 | |
113 filt[filterStart:filterCentre] = up | |
114 filt[filterCentre:filterEnd] = dn | |
115 | |
116 def warpSpectrum(self,magnitudeSpectrum): | |
117 '''Compute the Mel scaled spectrum.''' | |
118 return numpy.dot(magnitudeSpectrum,self.filterMatrix) | |
119 | |
120 def getDCTMatrix(self,size): | |
121 '''Calculate the square DCT transform matrix. Results are | |
122 equivalent to Matlab dctmtx(n) but with 64 bit precision.''' | |
123 DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) | |
124 DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size | |
125 DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) | |
126 DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) | |
127 return DCTmxT | |
128 | |
129 def dct(self,data_matrix): | |
130 '''Compute DCT of input matrix.''' | |
131 return numpy.dot(self.DCTMatrix,data_matrix) | |
132 | |
133 def getMFCCs(self,warpedSpectrum,cn=True): | |
134 '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' | |
135 mfccs=self.dct(numpy.log(warpedSpectrum)) | |
136 if cn is False : mfccs[0] = 0.0 | |
137 return mfccs | |
138 | |
139 | |
140 class PyMFCC_legacy(melScaling): | |
141 | |
142 def __init__(self,inputSampleRate): | |
143 | |
144 # flags for setting some Vampy options | |
145 self.vampy_flags = vf_DEBUG | vf_REALTIME | |
146 | |
147 self.m_inputSampleRate = int(inputSampleRate) | |
148 self.m_stepSize = 512 | |
149 self.m_blockSize = 2048 | |
150 self.m_channels = 1 | |
151 self.numBands = 40 | |
152 self.cnull = 1 | |
153 self.two_ch = False | |
154 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) | |
155 | |
156 def initialise(self,channels,stepSize,blockSize): | |
157 self.m_channels = channels | |
158 self.m_stepSize = stepSize | |
159 self.m_blockSize = blockSize | |
160 self.window = numpy.hamming(blockSize) | |
161 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) | |
162 return True | |
163 | |
164 def getMaker(self): | |
165 return 'Vampy Test Plugins' | |
166 | |
167 def getCopyright(self): | |
168 return 'Plugin By George Fazekas' | |
169 | |
170 def getName(self): | |
171 return 'Vampy Legacy FrequencyDomain MFCC Plugin' | |
172 | |
173 def getIdentifier(self): | |
174 return 'vampy-mfcc-test-legacy' | |
175 | |
176 def getDescription(self): | |
177 return 'Vampy FrequencyDomain MFCC Plugin using the Legacy interface.' | |
178 | |
179 def getMaxChannelCount(self): | |
180 return 2 | |
181 | |
182 def getInputDomain(self): | |
183 return FrequencyDomain | |
184 | |
185 def getPreferredBlockSize(self): | |
186 return 2048 | |
187 | |
188 def getPreferredStepSize(self): | |
189 return 512 | |
190 | |
191 def getOutputDescriptors(self): | |
192 | |
193 Generic = OutputDescriptor() | |
194 Generic.hasFixedBinCount=True | |
195 Generic.binCount=int(self.numBands)-self.cnull | |
196 Generic.hasKnownExtents=False | |
197 Generic.isQuantized=True | |
198 Generic.sampleType = OneSamplePerStep | |
199 | |
200 # note the inheritance of attributes (use is optional) | |
201 MFCC = OutputDescriptor(Generic) | |
202 MFCC.identifier = 'mfccs' | |
203 MFCC.name = 'MFCCs' | |
204 MFCC.description = 'MFCC Coefficients' | |
205 MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))) | |
206 MFCC.unit = None | |
207 if self.two_ch and self.m_channels == 2 : | |
208 MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull) | |
209 else : | |
210 MFCC.binCount = self.numBands-self.cnull | |
211 | |
212 warpedSpectrum = OutputDescriptor(Generic) | |
213 warpedSpectrum.identifier='warped-fft' | |
214 warpedSpectrum.name='Mel Scaled Spectrum' | |
215 warpedSpectrum.description='Mel Scaled Magnitide Spectrum' | |
216 warpedSpectrum.unit='Mel' | |
217 if self.two_ch and self.m_channels == 2 : | |
218 warpedSpectrum.binCount = self.m_channels * int(self.numBands) | |
219 else : | |
220 warpedSpectrum.binCount = self.numBands | |
221 | |
222 melFilter = OutputDescriptor(Generic) | |
223 melFilter.identifier = 'mel-filter-matrix' | |
224 melFilter.sampleType='FixedSampleRate' | |
225 melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize | |
226 melFilter.name='Mel Filter Matrix' | |
227 melFilter.description='Returns the created filter matrix in getRemainingFeatures.' | |
228 melFilter.unit = None | |
229 | |
230 return OutputList(MFCC,warpedSpectrum,melFilter) | |
231 | |
232 | |
233 def getParameterDescriptors(self): | |
234 | |
235 melbands = ParameterDescriptor() | |
236 melbands.identifier='melbands' | |
237 melbands.name='Number of bands (coefficients)' | |
238 melbands.description='Set the number of coefficients.' | |
239 melbands.unit = '' | |
240 melbands.minValue = 2 | |
241 melbands.maxValue = 128 | |
242 melbands.defaultValue = 40 | |
243 melbands.isQuantized = True | |
244 melbands.quantizeStep = 1 | |
245 | |
246 cnull = ParameterDescriptor() | |
247 cnull.identifier='cnull' | |
248 cnull.name='Return C0' | |
249 cnull.description='Select if the DC coefficient is required.' | |
250 cnull.unit = None | |
251 cnull.minValue = 0 | |
252 cnull.maxValue = 1 | |
253 cnull.defaultValue = 0 | |
254 cnull.isQuantized = True | |
255 cnull.quantizeStep = 1 | |
256 | |
257 two_ch = ParameterDescriptor(cnull) | |
258 two_ch.identifier='two_ch' | |
259 two_ch.name='Process channels separately' | |
260 two_ch.description='Process two channel files separately.' | |
261 two_ch.defaultValue = False | |
262 | |
263 minHz = ParameterDescriptor() | |
264 minHz.identifier='minHz' | |
265 minHz.name='minimum frequency' | |
266 minHz.description='Set the lower frequency bound.' | |
267 minHz.unit='Hz' | |
268 minHz.minValue = 0 | |
269 minHz.maxValue = 24000 | |
270 minHz.defaultValue = 0 | |
271 minHz.isQuantized = True | |
272 minHz.quantizeStep = 1.0 | |
273 | |
274 maxHz = ParameterDescriptor() | |
275 maxHz.identifier='maxHz' | |
276 maxHz.description='Set the upper frequency bound.' | |
277 maxHz.name='maximum frequency' | |
278 maxHz.unit='Hz' | |
279 maxHz.minValue = 100 | |
280 maxHz.maxValue = 24000 | |
281 maxHz.defaultValue = 11025 | |
282 maxHz.isQuantized = True | |
283 maxHz.quantizeStep = 100 | |
284 | |
285 return ParameterList(melbands,minHz,maxHz,cnull,two_ch) | |
286 | |
287 | |
288 def setParameter(self,paramid,newval): | |
289 self.valid = False | |
290 if paramid == 'minHz' : | |
291 if newval < self.maxHz and newval < self.NqHz : | |
292 self.minHz = float(newval) | |
293 print 'minHz: ', self.minHz | |
294 if paramid == 'maxHz' : | |
295 print 'trying to set maxHz to: ',newval | |
296 if newval < self.NqHz and newval > self.minHz+1000 : | |
297 self.maxHz = float(newval) | |
298 else : | |
299 self.maxHz = self.NqHz | |
300 print 'set to: ',self.maxHz | |
301 if paramid == 'cnull' : | |
302 self.cnull = int(not int(newval)) | |
303 if paramid == 'melbands' : | |
304 self.numBands = int(newval) | |
305 if paramid == 'two_ch' : | |
306 self.two_ch = bool(newval) | |
307 | |
308 return | |
309 | |
310 def getParameter(self,paramid): | |
311 if paramid == 'minHz' : | |
312 return float(self.minHz) | |
313 if paramid == 'maxHz' : | |
314 return float(self.maxHz) | |
315 if paramid == 'cnull' : | |
316 return float(not int(self.cnull)) | |
317 if paramid == 'melbands' : | |
318 return float(self.numBands) | |
319 if paramid == 'two_ch' : | |
320 return float(self.two_ch) | |
321 else: | |
322 return 0.0 | |
323 | |
324 # set numpy process using the 'use_numpy_interface' flag | |
325 def process(self,inputbuffers,timestamp): | |
326 | |
327 if not self.update() : return None | |
328 | |
329 if self.m_channels == 2 and self.two_ch : | |
330 return self.process2ch(inputbuffers,timestamp) | |
331 | |
332 fftsize = self.m_blockSize | |
333 | |
334 if self.m_channels > 1 : | |
335 # take the mean of the two magnitude spectra | |
336 complexSpectrum0 = array(inputbuffers[0]) | |
337 complexSpectrum1 = array(inputbuffers[1]) | |
338 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] | |
339 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] | |
340 magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2 | |
341 else : | |
342 complexSpectrum = array(inputbuffers[0]) | |
343 magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] | |
344 | |
345 # do the computation | |
346 melSpectrum = self.warpSpectrum(magnitudeSpectrum) | |
347 melCepstrum = self.getMFCCs(melSpectrum,cn=True) | |
348 | |
349 outputs = FeatureSet() | |
350 outputs[0] = Feature(melCepstrum[self.cnull:]) | |
351 outputs[1] = Feature(melSpectrum) | |
352 return outputs | |
353 | |
354 | |
355 # process channels separately (stack the returned arrays) | |
356 def process2ch(self,inputbuffers,timestamp): | |
357 | |
358 fftsize = self.m_blockSize | |
359 | |
360 complexSpectrum0 = array(inputbuffers[0]) | |
361 complexSpectrum1 = array(inputbuffers[1]) | |
362 | |
363 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] | |
364 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] | |
365 | |
366 # do the computations | |
367 melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0) | |
368 melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True) | |
369 melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1) | |
370 melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True) | |
371 | |
372 outputs = FeatureSet() | |
373 | |
374 outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:]))) | |
375 | |
376 outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0))) | |
377 | |
378 return outputs | |
379 | |
380 | |
381 def getRemainingFeatures(self): | |
382 if not self.update() : return [] | |
383 frameSampleStart = 0 | |
384 | |
385 output_featureSet = FeatureSet() | |
386 | |
387 # the filter is the third output (index starts from zero) | |
388 output_featureSet[2] = flist = FeatureList() | |
389 | |
390 while True: | |
391 f = Feature() | |
392 f.hasTimestamp = True | |
393 f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate) | |
394 try : | |
395 f.values = self.filterIter.next() | |
396 except StopIteration : | |
397 break | |
398 flist.append(f) | |
399 frameSampleStart += self.m_stepSize | |
400 | |
401 return output_featureSet |