comparison Example VamPy plugins/test/PyMFCC_buffer.py @ 37:27bab3a16c9a vampy2final

new branch Vampy2final
author fazekasgy
date Mon, 05 Oct 2009 11:28:00 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 37:27bab3a16c9a
1 '''PyMFCC_buffer.py - This example Vampy plugin demonstrates
2 how to return sprectrogram-like features.
3
4 This plugin uses the numpy BUFFER interface and
5 frequency domain input. Flag: vf_BUFFER
6
7 Centre for Digital Music, Queen Mary University of London.
8 Copyright 2006 Gyorgy Fazekas, QMUL.
9 (See Vamp API for licence information.)
10
11 Constants for Mel frequency conversion and filter
12 centre calculation are taken from the GNU GPL licenced
13 Freespeech library. Copyright (C) 1999 Jean-Marc Valin
14 '''
15
16 import sys,numpy
17 from numpy import log,exp,floor,sum
18 from numpy import *
19 from numpy.fft import *
20 import vampy
21 from vampy import *
22
23 class melScaling(object):
24
25 def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None):
26 '''Initialise frequency warping and DCT matrix.
27 Parameters:
28 sampleRate: audio sample rate
29 inputSize: length of magnitude spectrum (half of FFT size assumed)
30 numBands: number of mel Bands (MFCCs)
31 minHz: lower bound of warping (default = DC)
32 maxHz: higher bound of warping (default = Nyquist frequency)
33 '''
34 self.sampleRate = sampleRate
35 self.NqHz = sampleRate / 2.0
36 self.minHz = minHz
37 if maxHz is None : maxHz = self.NqHz
38 self.maxHz = maxHz
39 self.inputSize = inputSize
40 self.numBands = numBands
41 self.valid = False
42 self.updated = False
43
44 def update(self):
45 # make sure this will run only once if called from a vamp process
46
47 if self.updated: return self.valid
48 self.updated = True
49 self.valid = False
50 print 'Updating parameters and recalculating filters: '
51 print 'Nyquist: ',self.NqHz
52
53 if self.maxHz > self.NqHz :
54 raise Exception('Maximum frequency must be smaller than the Nyquist frequency')
55
56 self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0)
57 self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0)
58 print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel)
59 self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands)
60 self.DCTMatrix = self.getDCTMatrix(self.numBands)
61 self.filterIter = self.filterMatrix.__iter__()
62 self.valid = True
63 return self.valid
64
65 # try :
66 # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0)
67 # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0)
68 # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands)
69 # self.DCTMatrix = self.getDCTMatrix(self.numBands)
70 # self.filterIter = self.filterMatrix.__iter__()
71 # self.valid = True
72 # return True
73 # except :
74 # print "Invalid parameter setting encountered in MelScaling class."
75 # return False
76 # return True
77
78 def getFilterCentres(self,inputSize,numBands):
79 '''Calculate Mel filter centres around FFT bins.
80 This function calculates two extra bands at the edges for
81 finding the starting and end point of the first and last
82 actual filters.'''
83 centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel
84 centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz)
85 return numpy.array(centresBin,int)
86
87 def getFilterMatrix(self,inputSize,numBands):
88 '''Compose the Mel scaling matrix.'''
89 filterMatrix = numpy.zeros((numBands,inputSize))
90 self.filterCentres = self.getFilterCentres(inputSize,numBands)
91 for i in xrange(numBands) :
92 start,centre,end = self.filterCentres[i:i+3]
93 self.setFilter(filterMatrix[i],start,centre,end)
94 return filterMatrix.transpose()
95
96 def setFilter(self,filt,filterStart,filterCentre,filterEnd):
97 '''Calculate a single Mel filter.'''
98 k1 = numpy.float32(filterCentre-filterStart)
99 k2 = numpy.float32(filterEnd-filterCentre)
100 up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1
101 dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2
102 filt[filterStart:filterCentre] = up
103 filt[filterCentre:filterEnd] = dn
104
105 def warpSpectrum(self,magnitudeSpectrum):
106 '''Compute the Mel scaled spectrum.'''
107 return numpy.dot(magnitudeSpectrum,self.filterMatrix)
108
109 def getDCTMatrix(self,size):
110 '''Calculate the square DCT transform matrix. Results are
111 equivalent to Matlab dctmtx(n) but with 64 bit precision.'''
112 DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size)
113 DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size
114 DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT)
115 DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0)
116 return DCTmxT
117
118 def dct(self,data_matrix):
119 '''Compute DCT of input matrix.'''
120 return numpy.dot(self.DCTMatrix,data_matrix)
121
122 def getMFCCs(self,warpedSpectrum,cn=True):
123 '''Compute MFCC coefficients from Mel warped magnitude spectrum.'''
124 mfccs=self.dct(numpy.log(warpedSpectrum))
125 if cn is False : mfccs[0] = 0.0
126 return mfccs
127
128
129 class PyMFCC_buffer(melScaling):
130
131 def __init__(self,inputSampleRate):
132
133 # flags for setting some Vampy options
134 self.vampy_flags = vf_DEBUG | vf_BUFFER | vf_REALTIME
135
136 self.m_inputSampleRate = int(inputSampleRate)
137 self.m_stepSize = 512
138 self.m_blockSize = 2048
139 self.m_channels = 1
140 self.numBands = 40
141 self.cnull = 1
142 self.two_ch = False
143 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands)
144
145 def initialise(self,channels,stepSize,blockSize):
146 self.m_channels = channels
147 self.m_stepSize = stepSize
148 self.m_blockSize = blockSize
149 self.window = numpy.hamming(blockSize)
150 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands)
151 return True
152
153 def getMaker(self):
154 return 'Vampy Test Plugins'
155
156 def getCopyright(self):
157 return 'Plugin By George Fazekas'
158
159 def getName(self):
160 return 'Vampy Buffer MFCC Plugin'
161
162 def getIdentifier(self):
163 return 'vampy-mfcc-test-buffer'
164
165 def getDescription(self):
166 return 'A simple MFCC plugin. (using the Buffer interface)'
167
168 def getMaxChannelCount(self):
169 return 2
170
171 def getInputDomain(self):
172 return FrequencyDomain
173
174 def getPreferredBlockSize(self):
175 return 2048
176
177 def getPreferredStepSize(self):
178 return 512
179
180 def getOutputDescriptors(self):
181
182 Generic = OutputDescriptor()
183 Generic.hasFixedBinCount=True
184 Generic.binCount=int(self.numBands)-self.cnull
185 Generic.hasKnownExtents=False
186 Generic.isQuantized=True
187 Generic.sampleType = OneSamplePerStep
188
189 # note the inheritance of attributes (use is optional)
190 MFCC = OutputDescriptor(Generic)
191 MFCC.identifier = 'mfccs'
192 MFCC.name = 'MFCCs'
193 MFCC.description = 'MFCC Coefficients'
194 MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands)))
195 MFCC.unit = None
196 if self.two_ch and self.m_channels == 2 :
197 MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull)
198 else :
199 MFCC.binCount = self.numBands-self.cnull
200
201 warpedSpectrum = OutputDescriptor(Generic)
202 warpedSpectrum.identifier='warped-fft'
203 warpedSpectrum.name='Mel Scaled Spectrum'
204 warpedSpectrum.description='Mel Scaled Magnitide Spectrum'
205 warpedSpectrum.unit='Mel'
206 if self.two_ch and self.m_channels == 2 :
207 warpedSpectrum.binCount = self.m_channels * int(self.numBands)
208 else :
209 warpedSpectrum.binCount = self.numBands
210
211 melFilter = OutputDescriptor(Generic)
212 melFilter.identifier = 'mel-filter-matrix'
213 melFilter.sampleType='FixedSampleRate'
214 melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize
215 melFilter.name='Mel Filter Matrix'
216 melFilter.description='Returns the created filter matrix in getRemainingFeatures.'
217 melFilter.unit = None
218
219 return OutputList(MFCC,warpedSpectrum,melFilter)
220
221
222 def getParameterDescriptors(self):
223
224 melbands = ParameterDescriptor()
225 melbands.identifier='melbands'
226 melbands.name='Number of bands (coefficients)'
227 melbands.description='Set the number of coefficients.'
228 melbands.unit = ''
229 melbands.minValue = 2
230 melbands.maxValue = 128
231 melbands.defaultValue = 40
232 melbands.isQuantized = True
233 melbands.quantizeStep = 1
234
235 cnull = ParameterDescriptor()
236 cnull.identifier='cnull'
237 cnull.name='Return C0'
238 cnull.description='Select if the DC coefficient is required.'
239 cnull.unit = None
240 cnull.minValue = 0
241 cnull.maxValue = 1
242 cnull.defaultValue = 0
243 cnull.isQuantized = True
244 cnull.quantizeStep = 1
245
246 two_ch = ParameterDescriptor(cnull)
247 two_ch.identifier='two_ch'
248 two_ch.name='Process channels separately'
249 two_ch.description='Process two channel files separately.'
250 two_ch.defaultValue = False
251
252 minHz = ParameterDescriptor()
253 minHz.identifier='minHz'
254 minHz.name='minimum frequency'
255 minHz.description='Set the lower frequency bound.'
256 minHz.unit='Hz'
257 minHz.minValue = 0
258 minHz.maxValue = 24000
259 minHz.defaultValue = 0
260 minHz.isQuantized = True
261 minHz.quantizeStep = 1.0
262
263 maxHz = ParameterDescriptor()
264 maxHz.identifier='maxHz'
265 maxHz.description='Set the upper frequency bound.'
266 maxHz.name='maximum frequency'
267 maxHz.unit='Hz'
268 maxHz.minValue = 100
269 maxHz.maxValue = 24000
270 maxHz.defaultValue = 11025
271 maxHz.isQuantized = True
272 maxHz.quantizeStep = 100
273
274 return ParameterList(melbands,minHz,maxHz,cnull,two_ch)
275
276
277 def setParameter(self,paramid,newval):
278 self.valid = False
279 if paramid == 'minHz' :
280 if newval < self.maxHz and newval < self.NqHz :
281 self.minHz = float(newval)
282 print 'minHz: ', self.minHz
283 if paramid == 'maxHz' :
284 print 'trying to set maxHz to: ',newval
285 if newval < self.NqHz and newval > self.minHz+1000 :
286 self.maxHz = float(newval)
287 else :
288 self.maxHz = self.NqHz
289 print 'set to: ',self.maxHz
290 if paramid == 'cnull' :
291 self.cnull = int(not int(newval))
292 if paramid == 'melbands' :
293 self.numBands = int(newval)
294 if paramid == 'two_ch' :
295 self.two_ch = bool(newval)
296
297 return
298
299 def getParameter(self,paramid):
300 if paramid == 'minHz' :
301 return float(self.minHz)
302 if paramid == 'maxHz' :
303 return float(self.maxHz)
304 if paramid == 'cnull' :
305 return float(not int(self.cnull))
306 if paramid == 'melbands' :
307 return float(self.numBands)
308 if paramid == 'two_ch' :
309 return float(self.two_ch)
310 else:
311 return 0.0
312
313 # numpy process using the buffer interface
314 def process(self,inputbuffers,timestamp):
315
316 if not self.update() : return None
317
318 if self.m_channels == 2 and self.two_ch :
319 return self.process2ch(inputbuffers,timestamp)
320
321 fftsize = self.m_blockSize
322
323 if self.m_channels > 1 :
324 # take the mean of the two magnitude spectra
325 complexSpectrum0 = frombuffer(inputbuffers[0],complex64,-1,0)
326 complexSpectrum1 = frombuffer(inputbuffers[1],complex64,-1,0)
327 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2]
328 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2]
329 magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2
330 else :
331 complexSpectrum = frombuffer(inputbuffers[0],complex64,-1,0)
332 magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2]
333
334 # do the computation
335 melSpectrum = self.warpSpectrum(magnitudeSpectrum)
336 melCepstrum = self.getMFCCs(melSpectrum,cn=True)
337
338 # output feature set (the builtin dict type can also be used)
339 outputs = FeatureSet()
340 outputs[0] = Feature(melCepstrum[self.cnull:])
341 outputs[1] = Feature(melSpectrum)
342
343 return outputs
344
345 # process two channel files (stack the returned arrays)
346 def process2ch(self,inputbuffers,timestamp):
347
348 fftsize = self.m_blockSize
349
350 complexSpectrum0 = frombuffer(inputbuffers[0],complex64,-1,0)
351 complexSpectrum1 = frombuffer(inputbuffers[1],complex64,-1,0)
352
353 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2]
354 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2]
355
356 # do the computations
357 melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0)
358 melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True)
359 melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1)
360 melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True)
361
362 outputs = FeatureSet()
363
364 outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:])))
365
366 outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0)))
367
368 return outputs
369
370
371 def getRemainingFeatures(self):
372 if not self.update() : return []
373 frameSampleStart = 0
374
375 output_featureSet = FeatureSet()
376
377 # the filter is the third output (index starts from zero)
378 output_featureSet[2] = flist = FeatureList()
379
380 while True:
381 f = Feature()
382 f.hasTimestamp = True
383 f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate)
384 try :
385 f.values = self.filterIter.next()
386 except StopIteration :
387 break
388 flist.append(f)
389 frameSampleStart += self.m_stepSize
390
391 return output_featureSet
392