Mercurial > hg > vampy
comparison Example VamPy plugins/test/PyMFCC_buffer.py @ 37:27bab3a16c9a vampy2final
new branch Vampy2final
| author | fazekasgy |
|---|---|
| date | Mon, 05 Oct 2009 11:28:00 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 37:27bab3a16c9a |
|---|---|
| 1 '''PyMFCC_buffer.py - This example Vampy plugin demonstrates | |
| 2 how to return sprectrogram-like features. | |
| 3 | |
| 4 This plugin uses the numpy BUFFER interface and | |
| 5 frequency domain input. Flag: vf_BUFFER | |
| 6 | |
| 7 Centre for Digital Music, Queen Mary University of London. | |
| 8 Copyright 2006 Gyorgy Fazekas, QMUL. | |
| 9 (See Vamp API for licence information.) | |
| 10 | |
| 11 Constants for Mel frequency conversion and filter | |
| 12 centre calculation are taken from the GNU GPL licenced | |
| 13 Freespeech library. Copyright (C) 1999 Jean-Marc Valin | |
| 14 ''' | |
| 15 | |
| 16 import sys,numpy | |
| 17 from numpy import log,exp,floor,sum | |
| 18 from numpy import * | |
| 19 from numpy.fft import * | |
| 20 import vampy | |
| 21 from vampy import * | |
| 22 | |
| 23 class melScaling(object): | |
| 24 | |
| 25 def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): | |
| 26 '''Initialise frequency warping and DCT matrix. | |
| 27 Parameters: | |
| 28 sampleRate: audio sample rate | |
| 29 inputSize: length of magnitude spectrum (half of FFT size assumed) | |
| 30 numBands: number of mel Bands (MFCCs) | |
| 31 minHz: lower bound of warping (default = DC) | |
| 32 maxHz: higher bound of warping (default = Nyquist frequency) | |
| 33 ''' | |
| 34 self.sampleRate = sampleRate | |
| 35 self.NqHz = sampleRate / 2.0 | |
| 36 self.minHz = minHz | |
| 37 if maxHz is None : maxHz = self.NqHz | |
| 38 self.maxHz = maxHz | |
| 39 self.inputSize = inputSize | |
| 40 self.numBands = numBands | |
| 41 self.valid = False | |
| 42 self.updated = False | |
| 43 | |
| 44 def update(self): | |
| 45 # make sure this will run only once if called from a vamp process | |
| 46 | |
| 47 if self.updated: return self.valid | |
| 48 self.updated = True | |
| 49 self.valid = False | |
| 50 print 'Updating parameters and recalculating filters: ' | |
| 51 print 'Nyquist: ',self.NqHz | |
| 52 | |
| 53 if self.maxHz > self.NqHz : | |
| 54 raise Exception('Maximum frequency must be smaller than the Nyquist frequency') | |
| 55 | |
| 56 self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) | |
| 57 self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) | |
| 58 print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) | |
| 59 self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) | |
| 60 self.DCTMatrix = self.getDCTMatrix(self.numBands) | |
| 61 self.filterIter = self.filterMatrix.__iter__() | |
| 62 self.valid = True | |
| 63 return self.valid | |
| 64 | |
| 65 # try : | |
| 66 # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) | |
| 67 # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) | |
| 68 # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) | |
| 69 # self.DCTMatrix = self.getDCTMatrix(self.numBands) | |
| 70 # self.filterIter = self.filterMatrix.__iter__() | |
| 71 # self.valid = True | |
| 72 # return True | |
| 73 # except : | |
| 74 # print "Invalid parameter setting encountered in MelScaling class." | |
| 75 # return False | |
| 76 # return True | |
| 77 | |
| 78 def getFilterCentres(self,inputSize,numBands): | |
| 79 '''Calculate Mel filter centres around FFT bins. | |
| 80 This function calculates two extra bands at the edges for | |
| 81 finding the starting and end point of the first and last | |
| 82 actual filters.''' | |
| 83 centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel | |
| 84 centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) | |
| 85 return numpy.array(centresBin,int) | |
| 86 | |
| 87 def getFilterMatrix(self,inputSize,numBands): | |
| 88 '''Compose the Mel scaling matrix.''' | |
| 89 filterMatrix = numpy.zeros((numBands,inputSize)) | |
| 90 self.filterCentres = self.getFilterCentres(inputSize,numBands) | |
| 91 for i in xrange(numBands) : | |
| 92 start,centre,end = self.filterCentres[i:i+3] | |
| 93 self.setFilter(filterMatrix[i],start,centre,end) | |
| 94 return filterMatrix.transpose() | |
| 95 | |
| 96 def setFilter(self,filt,filterStart,filterCentre,filterEnd): | |
| 97 '''Calculate a single Mel filter.''' | |
| 98 k1 = numpy.float32(filterCentre-filterStart) | |
| 99 k2 = numpy.float32(filterEnd-filterCentre) | |
| 100 up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 | |
| 101 dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 | |
| 102 filt[filterStart:filterCentre] = up | |
| 103 filt[filterCentre:filterEnd] = dn | |
| 104 | |
| 105 def warpSpectrum(self,magnitudeSpectrum): | |
| 106 '''Compute the Mel scaled spectrum.''' | |
| 107 return numpy.dot(magnitudeSpectrum,self.filterMatrix) | |
| 108 | |
| 109 def getDCTMatrix(self,size): | |
| 110 '''Calculate the square DCT transform matrix. Results are | |
| 111 equivalent to Matlab dctmtx(n) but with 64 bit precision.''' | |
| 112 DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) | |
| 113 DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size | |
| 114 DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) | |
| 115 DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) | |
| 116 return DCTmxT | |
| 117 | |
| 118 def dct(self,data_matrix): | |
| 119 '''Compute DCT of input matrix.''' | |
| 120 return numpy.dot(self.DCTMatrix,data_matrix) | |
| 121 | |
| 122 def getMFCCs(self,warpedSpectrum,cn=True): | |
| 123 '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' | |
| 124 mfccs=self.dct(numpy.log(warpedSpectrum)) | |
| 125 if cn is False : mfccs[0] = 0.0 | |
| 126 return mfccs | |
| 127 | |
| 128 | |
| 129 class PyMFCC_buffer(melScaling): | |
| 130 | |
| 131 def __init__(self,inputSampleRate): | |
| 132 | |
| 133 # flags for setting some Vampy options | |
| 134 self.vampy_flags = vf_DEBUG | vf_BUFFER | vf_REALTIME | |
| 135 | |
| 136 self.m_inputSampleRate = int(inputSampleRate) | |
| 137 self.m_stepSize = 512 | |
| 138 self.m_blockSize = 2048 | |
| 139 self.m_channels = 1 | |
| 140 self.numBands = 40 | |
| 141 self.cnull = 1 | |
| 142 self.two_ch = False | |
| 143 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) | |
| 144 | |
| 145 def initialise(self,channels,stepSize,blockSize): | |
| 146 self.m_channels = channels | |
| 147 self.m_stepSize = stepSize | |
| 148 self.m_blockSize = blockSize | |
| 149 self.window = numpy.hamming(blockSize) | |
| 150 melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) | |
| 151 return True | |
| 152 | |
| 153 def getMaker(self): | |
| 154 return 'Vampy Test Plugins' | |
| 155 | |
| 156 def getCopyright(self): | |
| 157 return 'Plugin By George Fazekas' | |
| 158 | |
| 159 def getName(self): | |
| 160 return 'Vampy Buffer MFCC Plugin' | |
| 161 | |
| 162 def getIdentifier(self): | |
| 163 return 'vampy-mfcc-test-buffer' | |
| 164 | |
| 165 def getDescription(self): | |
| 166 return 'A simple MFCC plugin. (using the Buffer interface)' | |
| 167 | |
| 168 def getMaxChannelCount(self): | |
| 169 return 2 | |
| 170 | |
| 171 def getInputDomain(self): | |
| 172 return FrequencyDomain | |
| 173 | |
| 174 def getPreferredBlockSize(self): | |
| 175 return 2048 | |
| 176 | |
| 177 def getPreferredStepSize(self): | |
| 178 return 512 | |
| 179 | |
| 180 def getOutputDescriptors(self): | |
| 181 | |
| 182 Generic = OutputDescriptor() | |
| 183 Generic.hasFixedBinCount=True | |
| 184 Generic.binCount=int(self.numBands)-self.cnull | |
| 185 Generic.hasKnownExtents=False | |
| 186 Generic.isQuantized=True | |
| 187 Generic.sampleType = OneSamplePerStep | |
| 188 | |
| 189 # note the inheritance of attributes (use is optional) | |
| 190 MFCC = OutputDescriptor(Generic) | |
| 191 MFCC.identifier = 'mfccs' | |
| 192 MFCC.name = 'MFCCs' | |
| 193 MFCC.description = 'MFCC Coefficients' | |
| 194 MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))) | |
| 195 MFCC.unit = None | |
| 196 if self.two_ch and self.m_channels == 2 : | |
| 197 MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull) | |
| 198 else : | |
| 199 MFCC.binCount = self.numBands-self.cnull | |
| 200 | |
| 201 warpedSpectrum = OutputDescriptor(Generic) | |
| 202 warpedSpectrum.identifier='warped-fft' | |
| 203 warpedSpectrum.name='Mel Scaled Spectrum' | |
| 204 warpedSpectrum.description='Mel Scaled Magnitide Spectrum' | |
| 205 warpedSpectrum.unit='Mel' | |
| 206 if self.two_ch and self.m_channels == 2 : | |
| 207 warpedSpectrum.binCount = self.m_channels * int(self.numBands) | |
| 208 else : | |
| 209 warpedSpectrum.binCount = self.numBands | |
| 210 | |
| 211 melFilter = OutputDescriptor(Generic) | |
| 212 melFilter.identifier = 'mel-filter-matrix' | |
| 213 melFilter.sampleType='FixedSampleRate' | |
| 214 melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize | |
| 215 melFilter.name='Mel Filter Matrix' | |
| 216 melFilter.description='Returns the created filter matrix in getRemainingFeatures.' | |
| 217 melFilter.unit = None | |
| 218 | |
| 219 return OutputList(MFCC,warpedSpectrum,melFilter) | |
| 220 | |
| 221 | |
| 222 def getParameterDescriptors(self): | |
| 223 | |
| 224 melbands = ParameterDescriptor() | |
| 225 melbands.identifier='melbands' | |
| 226 melbands.name='Number of bands (coefficients)' | |
| 227 melbands.description='Set the number of coefficients.' | |
| 228 melbands.unit = '' | |
| 229 melbands.minValue = 2 | |
| 230 melbands.maxValue = 128 | |
| 231 melbands.defaultValue = 40 | |
| 232 melbands.isQuantized = True | |
| 233 melbands.quantizeStep = 1 | |
| 234 | |
| 235 cnull = ParameterDescriptor() | |
| 236 cnull.identifier='cnull' | |
| 237 cnull.name='Return C0' | |
| 238 cnull.description='Select if the DC coefficient is required.' | |
| 239 cnull.unit = None | |
| 240 cnull.minValue = 0 | |
| 241 cnull.maxValue = 1 | |
| 242 cnull.defaultValue = 0 | |
| 243 cnull.isQuantized = True | |
| 244 cnull.quantizeStep = 1 | |
| 245 | |
| 246 two_ch = ParameterDescriptor(cnull) | |
| 247 two_ch.identifier='two_ch' | |
| 248 two_ch.name='Process channels separately' | |
| 249 two_ch.description='Process two channel files separately.' | |
| 250 two_ch.defaultValue = False | |
| 251 | |
| 252 minHz = ParameterDescriptor() | |
| 253 minHz.identifier='minHz' | |
| 254 minHz.name='minimum frequency' | |
| 255 minHz.description='Set the lower frequency bound.' | |
| 256 minHz.unit='Hz' | |
| 257 minHz.minValue = 0 | |
| 258 minHz.maxValue = 24000 | |
| 259 minHz.defaultValue = 0 | |
| 260 minHz.isQuantized = True | |
| 261 minHz.quantizeStep = 1.0 | |
| 262 | |
| 263 maxHz = ParameterDescriptor() | |
| 264 maxHz.identifier='maxHz' | |
| 265 maxHz.description='Set the upper frequency bound.' | |
| 266 maxHz.name='maximum frequency' | |
| 267 maxHz.unit='Hz' | |
| 268 maxHz.minValue = 100 | |
| 269 maxHz.maxValue = 24000 | |
| 270 maxHz.defaultValue = 11025 | |
| 271 maxHz.isQuantized = True | |
| 272 maxHz.quantizeStep = 100 | |
| 273 | |
| 274 return ParameterList(melbands,minHz,maxHz,cnull,two_ch) | |
| 275 | |
| 276 | |
| 277 def setParameter(self,paramid,newval): | |
| 278 self.valid = False | |
| 279 if paramid == 'minHz' : | |
| 280 if newval < self.maxHz and newval < self.NqHz : | |
| 281 self.minHz = float(newval) | |
| 282 print 'minHz: ', self.minHz | |
| 283 if paramid == 'maxHz' : | |
| 284 print 'trying to set maxHz to: ',newval | |
| 285 if newval < self.NqHz and newval > self.minHz+1000 : | |
| 286 self.maxHz = float(newval) | |
| 287 else : | |
| 288 self.maxHz = self.NqHz | |
| 289 print 'set to: ',self.maxHz | |
| 290 if paramid == 'cnull' : | |
| 291 self.cnull = int(not int(newval)) | |
| 292 if paramid == 'melbands' : | |
| 293 self.numBands = int(newval) | |
| 294 if paramid == 'two_ch' : | |
| 295 self.two_ch = bool(newval) | |
| 296 | |
| 297 return | |
| 298 | |
| 299 def getParameter(self,paramid): | |
| 300 if paramid == 'minHz' : | |
| 301 return float(self.minHz) | |
| 302 if paramid == 'maxHz' : | |
| 303 return float(self.maxHz) | |
| 304 if paramid == 'cnull' : | |
| 305 return float(not int(self.cnull)) | |
| 306 if paramid == 'melbands' : | |
| 307 return float(self.numBands) | |
| 308 if paramid == 'two_ch' : | |
| 309 return float(self.two_ch) | |
| 310 else: | |
| 311 return 0.0 | |
| 312 | |
| 313 # numpy process using the buffer interface | |
| 314 def process(self,inputbuffers,timestamp): | |
| 315 | |
| 316 if not self.update() : return None | |
| 317 | |
| 318 if self.m_channels == 2 and self.two_ch : | |
| 319 return self.process2ch(inputbuffers,timestamp) | |
| 320 | |
| 321 fftsize = self.m_blockSize | |
| 322 | |
| 323 if self.m_channels > 1 : | |
| 324 # take the mean of the two magnitude spectra | |
| 325 complexSpectrum0 = frombuffer(inputbuffers[0],complex64,-1,0) | |
| 326 complexSpectrum1 = frombuffer(inputbuffers[1],complex64,-1,0) | |
| 327 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] | |
| 328 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] | |
| 329 magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2 | |
| 330 else : | |
| 331 complexSpectrum = frombuffer(inputbuffers[0],complex64,-1,0) | |
| 332 magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] | |
| 333 | |
| 334 # do the computation | |
| 335 melSpectrum = self.warpSpectrum(magnitudeSpectrum) | |
| 336 melCepstrum = self.getMFCCs(melSpectrum,cn=True) | |
| 337 | |
| 338 # output feature set (the builtin dict type can also be used) | |
| 339 outputs = FeatureSet() | |
| 340 outputs[0] = Feature(melCepstrum[self.cnull:]) | |
| 341 outputs[1] = Feature(melSpectrum) | |
| 342 | |
| 343 return outputs | |
| 344 | |
| 345 # process two channel files (stack the returned arrays) | |
| 346 def process2ch(self,inputbuffers,timestamp): | |
| 347 | |
| 348 fftsize = self.m_blockSize | |
| 349 | |
| 350 complexSpectrum0 = frombuffer(inputbuffers[0],complex64,-1,0) | |
| 351 complexSpectrum1 = frombuffer(inputbuffers[1],complex64,-1,0) | |
| 352 | |
| 353 magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] | |
| 354 magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] | |
| 355 | |
| 356 # do the computations | |
| 357 melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0) | |
| 358 melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True) | |
| 359 melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1) | |
| 360 melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True) | |
| 361 | |
| 362 outputs = FeatureSet() | |
| 363 | |
| 364 outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:]))) | |
| 365 | |
| 366 outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0))) | |
| 367 | |
| 368 return outputs | |
| 369 | |
| 370 | |
| 371 def getRemainingFeatures(self): | |
| 372 if not self.update() : return [] | |
| 373 frameSampleStart = 0 | |
| 374 | |
| 375 output_featureSet = FeatureSet() | |
| 376 | |
| 377 # the filter is the third output (index starts from zero) | |
| 378 output_featureSet[2] = flist = FeatureList() | |
| 379 | |
| 380 while True: | |
| 381 f = Feature() | |
| 382 f.hasTimestamp = True | |
| 383 f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate) | |
| 384 try : | |
| 385 f.values = self.filterIter.next() | |
| 386 except StopIteration : | |
| 387 break | |
| 388 flist.append(f) | |
| 389 frameSampleStart += self.m_stepSize | |
| 390 | |
| 391 return output_featureSet | |
| 392 |
