comparison Example VamPy plugins/test/PyMFCC_legacy.py @ 52:d56f48aafb99

Updated some example plugins.
author fazekasgy
date Thu, 08 Oct 2009 08:59:08 +0000
parents
children
comparison
equal deleted inserted replaced
51:c1e4f706ca9a 52:d56f48aafb99
1 '''PyMFCC_legacy.py - This example Vampy plugin demonstrates
2 how to return sprectrogram-like features.
3
4 This plugin has frequency domain input and is using
5 the legacy input interface: the frequency samples are
6 passed as python list of complex numbers.
7
8 Note: This is not the adviced way of writing plugins,
9 since the input interfaces provided for Numpy such as the
10 Numpy Array interface (flag: vf_ARRAY) are much faster.
11
12 This plugin is using Numpy, but it does not rely on Vampy's
13 capability of passing Numpy arrays to the process directly.
14 However, it returns Numpy arrays from the process.
15
16 Centre for Digital Music, Queen Mary University of London.
17 Copyright 2006 Gyorgy Fazekas, QMUL.
18 (See Vamp API for licence information.)
19
20 Constants for Mel frequency conversion and filter
21 centre calculation are taken from the GNU GPL licenced
22 Freespeech library. Copyright (C) 1999 Jean-Marc Valin
23 '''
24
25 import sys,numpy
26 from numpy import log,exp,floor,sum
27 from numpy import *
28 from numpy.fft import *
29 import vampy
30 from vampy import *
31
32
33 class melScaling(object):
34
35 def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None):
36 '''Initialise frequency warping and DCT matrix.
37 Parameters:
38 sampleRate: audio sample rate
39 inputSize: length of magnitude spectrum (half of FFT size assumed)
40 numBands: number of mel Bands (MFCCs)
41 minHz: lower bound of warping (default = DC)
42 maxHz: higher bound of warping (default = Nyquist frequency)
43 '''
44 self.sampleRate = sampleRate
45 self.NqHz = sampleRate / 2.0
46 self.minHz = minHz
47 if maxHz is None : maxHz = self.NqHz
48 self.maxHz = maxHz
49 self.inputSize = inputSize
50 self.numBands = numBands
51 self.valid = False
52 self.updated = False
53
54
55 def update(self):
56 # make sure this will run only once if called from a vamp process
57
58 if self.updated: return self.valid
59 self.updated = True
60 self.valid = False
61 print 'Updating parameters and recalculating filters: '
62 print 'Nyquist: ',self.NqHz
63
64 if self.maxHz > self.NqHz :
65 raise Exception('Maximum frequency must be smaller than the Nyquist frequency')
66
67 self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0)
68 self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0)
69 print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel)
70 self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands)
71 self.DCTMatrix = self.getDCTMatrix(self.numBands)
72 self.filterIter = self.filterMatrix.__iter__()
73 self.valid = True
74 return self.valid
75
76 # try :
77 # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0)
78 # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0)
79 # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands)
80 # self.DCTMatrix = self.getDCTMatrix(self.numBands)
81 # self.filterIter = self.filterMatrix.__iter__()
82 # self.valid = True
83 # return True
84 # except :
85 # print "Invalid parameter setting encountered in MelScaling class."
86 # return False
87 # return True
88
89 def getFilterCentres(self,inputSize,numBands):
90 '''Calculate Mel filter centres around FFT bins.
91 This function calculates two extra bands at the edges for
92 finding the starting and end point of the first and last
93 actual filters.'''
94 centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel
95 centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz)
96 return numpy.array(centresBin,int)
97
98 def getFilterMatrix(self,inputSize,numBands):
99 '''Compose the Mel scaling matrix.'''
100 filterMatrix = numpy.zeros((numBands,inputSize))
101 self.filterCentres = self.getFilterCentres(inputSize,numBands)
102 for i in xrange(numBands) :
103 start,centre,end = self.filterCentres[i:i+3]
104 self.setFilter(filterMatrix[i],start,centre,end)
105 return filterMatrix.transpose()
106
107 def setFilter(self,filt,filterStart,filterCentre,filterEnd):
108 '''Calculate a single Mel filter.'''
109 k1 = numpy.float32(filterCentre-filterStart)
110 k2 = numpy.float32(filterEnd-filterCentre)
111 up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1
112 dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2
113 filt[filterStart:filterCentre] = up
114 filt[filterCentre:filterEnd] = dn
115
116 def warpSpectrum(self,magnitudeSpectrum):
117 '''Compute the Mel scaled spectrum.'''
118 return numpy.dot(magnitudeSpectrum,self.filterMatrix)
119
120 def getDCTMatrix(self,size):
121 '''Calculate the square DCT transform matrix. Results are
122 equivalent to Matlab dctmtx(n) but with 64 bit precision.'''
123 DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size)
124 DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size
125 DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT)
126 DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0)
127 return DCTmxT
128
129 def dct(self,data_matrix):
130 '''Compute DCT of input matrix.'''
131 return numpy.dot(self.DCTMatrix,data_matrix)
132
133 def getMFCCs(self,warpedSpectrum,cn=True):
134 '''Compute MFCC coefficients from Mel warped magnitude spectrum.'''
135 mfccs=self.dct(numpy.log(warpedSpectrum))
136 if cn is False : mfccs[0] = 0.0
137 return mfccs
138
139
140 class PyMFCC_legacy(melScaling):
141
142 def __init__(self,inputSampleRate):
143
144 # flags for setting some Vampy options
145 self.vampy_flags = vf_DEBUG | vf_REALTIME
146
147 self.m_inputSampleRate = int(inputSampleRate)
148 self.m_stepSize = 512
149 self.m_blockSize = 2048
150 self.m_channels = 1
151 self.numBands = 40
152 self.cnull = 1
153 self.two_ch = False
154 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands)
155
156 def initialise(self,channels,stepSize,blockSize):
157 self.m_channels = channels
158 self.m_stepSize = stepSize
159 self.m_blockSize = blockSize
160 self.window = numpy.hamming(blockSize)
161 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands)
162 return True
163
164 def getMaker(self):
165 return 'Vampy Test Plugins'
166
167 def getCopyright(self):
168 return 'Plugin By George Fazekas'
169
170 def getName(self):
171 return 'Vampy Legacy FrequencyDomain MFCC Plugin'
172
173 def getIdentifier(self):
174 return 'vampy-mfcc-test-legacy'
175
176 def getDescription(self):
177 return 'Vampy FrequencyDomain MFCC Plugin using the Legacy interface.'
178
179 def getMaxChannelCount(self):
180 return 2
181
182 def getInputDomain(self):
183 return FrequencyDomain
184
185 def getPreferredBlockSize(self):
186 return 2048
187
188 def getPreferredStepSize(self):
189 return 512
190
191 def getOutputDescriptors(self):
192
193 Generic = OutputDescriptor()
194 Generic.hasFixedBinCount=True
195 Generic.binCount=int(self.numBands)-self.cnull
196 Generic.hasKnownExtents=False
197 Generic.isQuantized=True
198 Generic.sampleType = OneSamplePerStep
199
200 # note the inheritance of attributes (use is optional)
201 MFCC = OutputDescriptor(Generic)
202 MFCC.identifier = 'mfccs'
203 MFCC.name = 'MFCCs'
204 MFCC.description = 'MFCC Coefficients'
205 MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands)))
206 MFCC.unit = None
207 if self.two_ch and self.m_channels == 2 :
208 MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull)
209 else :
210 MFCC.binCount = self.numBands-self.cnull
211
212 warpedSpectrum = OutputDescriptor(Generic)
213 warpedSpectrum.identifier='warped-fft'
214 warpedSpectrum.name='Mel Scaled Spectrum'
215 warpedSpectrum.description='Mel Scaled Magnitide Spectrum'
216 warpedSpectrum.unit='Mel'
217 if self.two_ch and self.m_channels == 2 :
218 warpedSpectrum.binCount = self.m_channels * int(self.numBands)
219 else :
220 warpedSpectrum.binCount = self.numBands
221
222 melFilter = OutputDescriptor(Generic)
223 melFilter.identifier = 'mel-filter-matrix'
224 melFilter.sampleType='FixedSampleRate'
225 melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize
226 melFilter.name='Mel Filter Matrix'
227 melFilter.description='Returns the created filter matrix in getRemainingFeatures.'
228 melFilter.unit = None
229
230 return OutputList(MFCC,warpedSpectrum,melFilter)
231
232
233 def getParameterDescriptors(self):
234
235 melbands = ParameterDescriptor()
236 melbands.identifier='melbands'
237 melbands.name='Number of bands (coefficients)'
238 melbands.description='Set the number of coefficients.'
239 melbands.unit = ''
240 melbands.minValue = 2
241 melbands.maxValue = 128
242 melbands.defaultValue = 40
243 melbands.isQuantized = True
244 melbands.quantizeStep = 1
245
246 cnull = ParameterDescriptor()
247 cnull.identifier='cnull'
248 cnull.name='Return C0'
249 cnull.description='Select if the DC coefficient is required.'
250 cnull.unit = None
251 cnull.minValue = 0
252 cnull.maxValue = 1
253 cnull.defaultValue = 0
254 cnull.isQuantized = True
255 cnull.quantizeStep = 1
256
257 two_ch = ParameterDescriptor(cnull)
258 two_ch.identifier='two_ch'
259 two_ch.name='Process channels separately'
260 two_ch.description='Process two channel files separately.'
261 two_ch.defaultValue = False
262
263 minHz = ParameterDescriptor()
264 minHz.identifier='minHz'
265 minHz.name='minimum frequency'
266 minHz.description='Set the lower frequency bound.'
267 minHz.unit='Hz'
268 minHz.minValue = 0
269 minHz.maxValue = 24000
270 minHz.defaultValue = 0
271 minHz.isQuantized = True
272 minHz.quantizeStep = 1.0
273
274 maxHz = ParameterDescriptor()
275 maxHz.identifier='maxHz'
276 maxHz.description='Set the upper frequency bound.'
277 maxHz.name='maximum frequency'
278 maxHz.unit='Hz'
279 maxHz.minValue = 100
280 maxHz.maxValue = 24000
281 maxHz.defaultValue = 11025
282 maxHz.isQuantized = True
283 maxHz.quantizeStep = 100
284
285 return ParameterList(melbands,minHz,maxHz,cnull,two_ch)
286
287
288 def setParameter(self,paramid,newval):
289 self.valid = False
290 if paramid == 'minHz' :
291 if newval < self.maxHz and newval < self.NqHz :
292 self.minHz = float(newval)
293 print 'minHz: ', self.minHz
294 if paramid == 'maxHz' :
295 print 'trying to set maxHz to: ',newval
296 if newval < self.NqHz and newval > self.minHz+1000 :
297 self.maxHz = float(newval)
298 else :
299 self.maxHz = self.NqHz
300 print 'set to: ',self.maxHz
301 if paramid == 'cnull' :
302 self.cnull = int(not int(newval))
303 if paramid == 'melbands' :
304 self.numBands = int(newval)
305 if paramid == 'two_ch' :
306 self.two_ch = bool(newval)
307
308 return
309
310 def getParameter(self,paramid):
311 if paramid == 'minHz' :
312 return float(self.minHz)
313 if paramid == 'maxHz' :
314 return float(self.maxHz)
315 if paramid == 'cnull' :
316 return float(not int(self.cnull))
317 if paramid == 'melbands' :
318 return float(self.numBands)
319 if paramid == 'two_ch' :
320 return float(self.two_ch)
321 else:
322 return 0.0
323
324 # set numpy process using the 'use_numpy_interface' flag
325 def process(self,inputbuffers,timestamp):
326
327 if not self.update() : return None
328
329 if self.m_channels == 2 and self.two_ch :
330 return self.process2ch(inputbuffers,timestamp)
331
332 fftsize = self.m_blockSize
333
334 if self.m_channels > 1 :
335 # take the mean of the two magnitude spectra
336 complexSpectrum0 = array(inputbuffers[0])
337 complexSpectrum1 = array(inputbuffers[1])
338 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2]
339 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2]
340 magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2
341 else :
342 complexSpectrum = array(inputbuffers[0])
343 magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2]
344
345 # do the computation
346 melSpectrum = self.warpSpectrum(magnitudeSpectrum)
347 melCepstrum = self.getMFCCs(melSpectrum,cn=True)
348
349 outputs = FeatureSet()
350 outputs[0] = Feature(melCepstrum[self.cnull:])
351 outputs[1] = Feature(melSpectrum)
352 return outputs
353
354
355 # process channels separately (stack the returned arrays)
356 def process2ch(self,inputbuffers,timestamp):
357
358 fftsize = self.m_blockSize
359
360 complexSpectrum0 = array(inputbuffers[0])
361 complexSpectrum1 = array(inputbuffers[1])
362
363 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2]
364 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2]
365
366 # do the computations
367 melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0)
368 melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True)
369 melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1)
370 melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True)
371
372 outputs = FeatureSet()
373
374 outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:])))
375
376 outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0)))
377
378 return outputs
379
380
381 def getRemainingFeatures(self):
382 if not self.update() : return []
383 frameSampleStart = 0
384
385 output_featureSet = FeatureSet()
386
387 # the filter is the third output (index starts from zero)
388 output_featureSet[2] = flist = FeatureList()
389
390 while True:
391 f = Feature()
392 f.hasTimestamp = True
393 f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate)
394 try :
395 f.values = self.filterIter.next()
396 except StopIteration :
397 break
398 flist.append(f)
399 frameSampleStart += self.m_stepSize
400
401 return output_featureSet