Mercurial > hg > vampy
comparison Example VamPy plugins/test/PyMFCC_oldstyle.py @ 37:27bab3a16c9a vampy2final
new branch Vampy2final
author | fazekasgy |
---|---|
date | Mon, 05 Oct 2009 11:28:00 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 37:27bab3a16c9a |
---|---|
1 '''PyMFCC_oldstyle.py - This example Vampy plugin demonstrates | |
2 how to return sprectrogram-like features. | |
3 | |
4 This plugin uses backward compatible syntax and | |
5 no extension module. | |
6 | |
7 Centre for Digital Music, Queen Mary University of London. | |
8 Copyright 2006 Gyorgy Fazekas, QMUL. | |
9 (See Vamp API for licence information.) | |
10 | |
11 Constants for Mel frequency conversion and filter | |
12 centre calculation are taken from the GNU GPL licenced | |
13 Freespeech library. Copyright (C) 1999 Jean-Marc Valin | |
14 ''' | |
15 | |
16 import sys,numpy | |
17 from numpy import log,exp,floor,sum | |
18 from numpy import * | |
19 from numpy.fft import * | |
20 | |
21 | |
22 class melScaling(object): | |
23 | |
24 def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): | |
25 '''Initialise frequency warping and DCT matrix. | |
26 Parameters: | |
27 sampleRate: audio sample rate | |
28 inputSize: length of magnitude spectrum (half of FFT size assumed) | |
29 numBands: number of mel Bands (MFCCs) | |
30 minHz: lower bound of warping (default = DC) | |
31 maxHz: higher bound of warping (default = Nyquist frequency) | |
32 ''' | |
33 self.sampleRate = sampleRate | |
34 self.NqHz = sampleRate / 2.0 | |
35 self.minHz = minHz | |
36 if maxHz is None : maxHz = self.NqHz | |
37 self.maxHz = maxHz | |
38 self.inputSize = inputSize | |
39 self.numBands = numBands | |
40 self.valid = False | |
41 self.updated = False | |
42 # print '\n\n>>Plugin initialised with sample rate: %s<<\n\n' %self.sampleRate | |
43 # print 'minHz:%s\nmaxHz:%s\n' %(self.minHz,self.maxHz) | |
44 | |
45 | |
46 def update(self): | |
47 # make sure this will run only once if called from a vamp process | |
48 | |
49 if self.updated: return self.valid | |
50 self.updated = True | |
51 self.valid = False | |
52 print 'Updating parameters and recalculating filters: ' | |
53 print 'Nyquist: ',self.NqHz | |
54 | |
55 if self.maxHz > self.NqHz : | |
56 raise Exception('Maximum frequency must be smaller than the Nyquist frequency') | |
57 | |
58 self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) | |
59 self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) | |
60 print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) | |
61 self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) | |
62 self.DCTMatrix = self.getDCTMatrix(self.numBands) | |
63 self.filterIter = self.filterMatrix.__iter__() | |
64 self.valid = True | |
65 return self.valid | |
66 | |
67 # try : | |
68 # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) | |
69 # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) | |
70 # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) | |
71 # self.DCTMatrix = self.getDCTMatrix(self.numBands) | |
72 # self.filterIter = self.filterMatrix.__iter__() | |
73 # self.valid = True | |
74 # return True | |
75 # except : | |
76 # print "Invalid parameter setting encountered in MelScaling class." | |
77 # return False | |
78 # return True | |
79 | |
80 def getFilterCentres(self,inputSize,numBands): | |
81 '''Calculate Mel filter centres around FFT bins. | |
82 This function calculates two extra bands at the edges for | |
83 finding the starting and end point of the first and last | |
84 actual filters.''' | |
85 centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel | |
86 centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) | |
87 return numpy.array(centresBin,int) | |
88 | |
89 def getFilterMatrix(self,inputSize,numBands): | |
90 '''Compose the Mel scaling matrix.''' | |
91 filterMatrix = numpy.zeros((numBands,inputSize)) | |
92 self.filterCentres = self.getFilterCentres(inputSize,numBands) | |
93 for i in xrange(numBands) : | |
94 start,centre,end = self.filterCentres[i:i+3] | |
95 self.setFilter(filterMatrix[i],start,centre,end) | |
96 return filterMatrix.transpose() | |
97 | |
98 def setFilter(self,filt,filterStart,filterCentre,filterEnd): | |
99 '''Calculate a single Mel filter.''' | |
100 k1 = numpy.float32(filterCentre-filterStart) | |
101 k2 = numpy.float32(filterEnd-filterCentre) | |
102 up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 | |
103 dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 | |
104 filt[filterStart:filterCentre] = up | |
105 filt[filterCentre:filterEnd] = dn | |
106 | |
107 def warpSpectrum(self,magnitudeSpectrum): | |
108 '''Compute the Mel scaled spectrum.''' | |
109 return numpy.dot(magnitudeSpectrum,self.filterMatrix) | |
110 | |
111 def getDCTMatrix(self,size): | |
112 '''Calculate the square DCT transform matrix. Results are | |
113 equivalent to Matlab dctmtx(n) but with 64 bit precision.''' | |
114 DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) | |
115 DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size | |
116 DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) | |
117 DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) | |
118 return DCTmxT | |
119 | |
120 def dct(self,data_matrix): | |
121 '''Compute DCT of input matrix.''' | |
122 return numpy.dot(self.DCTMatrix,data_matrix) | |
123 | |
124 def getMFCCs(self,warpedSpectrum,cn=True): | |
125 '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' | |
126 mfccs=self.dct(numpy.log(warpedSpectrum)) | |
127 if cn is False : mfccs[0] = 0.0 | |
128 return mfccs | |
129 | |
130 | |
131 class PyMFCC_oldstyle(melScaling): | |
132 | |
133 def __init__(self,inputSampleRate): | |
134 self.vampy_flags = 1 # vf_DEBUG = 1 | |
135 self.m_inputSampleRate = inputSampleRate | |
136 self.m_stepSize = 1024 | |
137 self.m_blockSize = 2048 | |
138 self.m_channels = 1 | |
139 self.numBands = 40 | |
140 self.cnull = 1 | |
141 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) | |
142 | |
143 def initialise(self,channels,stepSize,blockSize): | |
144 self.m_channels = channels | |
145 self.m_stepSize = stepSize | |
146 self.m_blockSize = blockSize | |
147 self.window = numpy.hamming(blockSize) | |
148 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) | |
149 return True | |
150 | |
151 def getMaker(self): | |
152 return 'Vampy Test Plugins' | |
153 | |
154 def getCopyright(self): | |
155 return 'Plugin By George Fazekas' | |
156 | |
157 def getName(self): | |
158 return 'Vampy Old Style MFCC Plugin' | |
159 | |
160 def getIdentifier(self): | |
161 return 'vampy-mfcc-test-old' | |
162 | |
163 def getDescription(self): | |
164 return 'A simple MFCC plugin. (using the old syntax)' | |
165 | |
166 def getMaxChannelCount(self): | |
167 return 1 | |
168 | |
169 def getInputDomain(self): | |
170 return 'TimeDomain' | |
171 | |
172 def getPreferredBlockSize(self): | |
173 return 2048 | |
174 | |
175 def getPreferredStepSize(self): | |
176 return 512 | |
177 | |
178 def getOutputDescriptors(self): | |
179 | |
180 Generic={ | |
181 'hasFixedBinCount':True, | |
182 'binCount':int(self.numBands)-self.cnull, | |
183 'hasKnownExtents':False, | |
184 'isQuantized':True, | |
185 'sampleType':'OneSamplePerStep' | |
186 } | |
187 | |
188 MFCC=Generic.copy() | |
189 MFCC.update({ | |
190 'identifier':'mfccs', | |
191 'name':'MFCCs', | |
192 'description':'MFCC Coefficients', | |
193 'binNames':map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))), | |
194 'unit':'' | |
195 }) | |
196 | |
197 warpedSpectrum=Generic.copy() | |
198 warpedSpectrum.update({ | |
199 'identifier':'warped-fft', | |
200 'name':'Mel Scaled Spectrum', | |
201 'description':'Mel Scaled Magnitide Spectrum', | |
202 'unit':'Mel' | |
203 }) | |
204 | |
205 melFilter=Generic.copy() | |
206 melFilter.update({ | |
207 'identifier':'mel-filter', | |
208 'name':'Mel Filter Matrix', | |
209 'description':'Returns the created filter matrix.', | |
210 'sampleType':'FixedSampleRate', | |
211 'sampleRate':self.m_inputSampleRate/self.m_stepSize, | |
212 'unit':'' | |
213 }) | |
214 | |
215 return [MFCC,warpedSpectrum,melFilter] | |
216 | |
217 def getParameterDescriptors(self): | |
218 melbands = { | |
219 'identifier':'melbands', | |
220 'name':'Number of bands (coefficients)', | |
221 'description':'Set the number of coefficients.', | |
222 'unit':'', | |
223 'minValue':2.0, | |
224 'maxValue':128.0, | |
225 'defaultValue':40.0, | |
226 'isQuantized':True, | |
227 'quantizeStep':1.0 | |
228 } | |
229 | |
230 cnull = { | |
231 'identifier':'cnull', | |
232 'name':'Return C0', | |
233 'description':'Select if the DC coefficient is required.', | |
234 'unit':'', | |
235 'minValue':0.0, | |
236 'maxValue':1.0, | |
237 'defaultValue':0.0, | |
238 'isQuantized':True, | |
239 'quantizeStep':1.0 | |
240 } | |
241 | |
242 minHz = { | |
243 'identifier':'minHz', | |
244 'name':'minimum frequency', | |
245 'description':'Set the lower frequency bound.', | |
246 'unit':'Hz', | |
247 'minValue':0.0, | |
248 'maxValue':24000.0, | |
249 'defaultValue':0.0, | |
250 'isQuantized':True, | |
251 'quantizeStep':1.0 | |
252 } | |
253 | |
254 maxHz = { | |
255 'identifier':'maxHz', | |
256 'name':'maximum frequency', | |
257 'description':'Set the upper frequency bound.', | |
258 'unit':'Hz', | |
259 'minValue':100.0, | |
260 'maxValue':24000.0, | |
261 'defaultValue':11025.0, | |
262 'isQuantized':True, | |
263 'quantizeStep':100.0 | |
264 } | |
265 | |
266 return [melbands,minHz,maxHz,cnull] | |
267 | |
268 def setParameter(self,paramid,newval): | |
269 self.valid = False | |
270 if paramid == 'minHz' : | |
271 if newval < self.maxHz and newval < self.NqHz : | |
272 self.minHz = float(newval) | |
273 print 'minHz: ', self.minHz | |
274 if paramid == 'maxHz' : | |
275 print 'trying to set maxHz to: ',newval | |
276 if newval < self.NqHz and newval > self.minHz+1000 : | |
277 self.maxHz = float(newval) | |
278 else : | |
279 self.maxHz = self.NqHz | |
280 print 'set to: ',self.maxHz | |
281 if paramid == 'cnull' : | |
282 self.cnull = int(not int(newval)) | |
283 if paramid == 'melbands' : | |
284 self.numBands = int(newval) | |
285 return | |
286 | |
287 def getParameter(self,paramid): | |
288 if paramid == 'minHz' : | |
289 return float(self.minHz) | |
290 if paramid == 'maxHz' : | |
291 return float(self.maxHz) | |
292 if paramid == 'cnull' : | |
293 return float(not int(self.cnull)) | |
294 if paramid == 'melbands' : | |
295 return float(self.numBands) | |
296 else: | |
297 return 0.0 | |
298 | |
299 def processN(self,membuffer,frameSampleStart): | |
300 | |
301 # recalculate the filter and DCT matrices if needed | |
302 if not self.update() : return [] | |
303 | |
304 fftsize = self.m_blockSize | |
305 audioSamples = frombuffer(membuffer[0],float32) | |
306 | |
307 complexSpectrum = fft(self.window*audioSamples,fftsize) | |
308 #complexSpectrum = frombuffer(membuffer[0],complex64,-1,8) | |
309 | |
310 magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] / (fftsize/2) | |
311 melSpectrum = self.warpSpectrum(magnitudeSpectrum) | |
312 melCepstrum = self.getMFCCs(melSpectrum,cn=True) | |
313 | |
314 output_melCepstrum = [{ | |
315 'hasTimestamp':False, | |
316 'values':melCepstrum[self.cnull:].tolist() | |
317 }] | |
318 | |
319 output_melSpectrum = [{ | |
320 'hasTimestamp':False, | |
321 'values':melSpectrum.tolist() | |
322 }] | |
323 | |
324 return [output_melCepstrum,output_melSpectrum,[]] | |
325 | |
326 | |
327 def getRemainingFeatures(self): | |
328 if not self.update() : return [] | |
329 frameSampleStart = 0 | |
330 output_melFilter = [] | |
331 | |
332 while True: | |
333 try : | |
334 melFilter = self.filterIter.next() | |
335 output_melFilter.append({ | |
336 'hasTimestamp':True, | |
337 'timeStamp':frameSampleStart, | |
338 'values':melFilter.tolist() | |
339 }) | |
340 frameSampleStart += self.m_stepSize | |
341 except StopIteration : | |
342 break; | |
343 | |
344 return [[],[],output_melFilter] | |
345 | |
346 | |
347 # ============================================ | |
348 # Simple Unit Tests | |
349 # ============================================ | |
350 | |
351 def main(): | |
352 | |
353 dct = melScaling(44100,2048,numBands=4) | |
354 dct.update() | |
355 print dct.DCTMatrix | |
356 # print dct.getMFCCs(numpy.array([0.0,0.1,0.0,-0.1],numpy.float64)) | |
357 sys.exit(-1) | |
358 | |
359 if __name__ == '__main__': | |
360 main() | |
361 |