fazekasgy@37: '''PyMFCC_oldstyle.py - This example Vampy plugin demonstrates fazekasgy@37: how to return sprectrogram-like features. fazekasgy@37: fazekasgy@37: This plugin uses backward compatible syntax and fazekasgy@37: no extension module. fazekasgy@37: fazekasgy@37: Centre for Digital Music, Queen Mary University of London. fazekasgy@37: Copyright 2006 Gyorgy Fazekas, QMUL. fazekasgy@37: (See Vamp API for licence information.) fazekasgy@37: fazekasgy@37: Constants for Mel frequency conversion and filter fazekasgy@37: centre calculation are taken from the GNU GPL licenced fazekasgy@37: Freespeech library. Copyright (C) 1999 Jean-Marc Valin fazekasgy@37: ''' fazekasgy@37: fazekasgy@37: import sys,numpy fazekasgy@37: from numpy import log,exp,floor,sum fazekasgy@37: from numpy import * fazekasgy@37: from numpy.fft import * fazekasgy@37: fazekasgy@37: fazekasgy@37: class melScaling(object): fazekasgy@37: fazekasgy@37: def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): fazekasgy@37: '''Initialise frequency warping and DCT matrix. fazekasgy@37: Parameters: fazekasgy@37: sampleRate: audio sample rate fazekasgy@37: inputSize: length of magnitude spectrum (half of FFT size assumed) fazekasgy@37: numBands: number of mel Bands (MFCCs) fazekasgy@37: minHz: lower bound of warping (default = DC) fazekasgy@37: maxHz: higher bound of warping (default = Nyquist frequency) fazekasgy@37: ''' fazekasgy@37: self.sampleRate = sampleRate fazekasgy@37: self.NqHz = sampleRate / 2.0 fazekasgy@37: self.minHz = minHz fazekasgy@37: if maxHz is None : maxHz = self.NqHz fazekasgy@37: self.maxHz = maxHz fazekasgy@37: self.inputSize = inputSize fazekasgy@37: self.numBands = numBands fazekasgy@37: self.valid = False fazekasgy@37: self.updated = False fazekasgy@37: # print '\n\n>>Plugin initialised with sample rate: %s<<\n\n' %self.sampleRate fazekasgy@37: # print 'minHz:%s\nmaxHz:%s\n' %(self.minHz,self.maxHz) fazekasgy@37: fazekasgy@37: fazekasgy@37: def update(self): fazekasgy@37: # make sure this will run only once if called from a vamp process fazekasgy@37: fazekasgy@37: if self.updated: return self.valid fazekasgy@37: self.updated = True fazekasgy@37: self.valid = False fazekasgy@37: print 'Updating parameters and recalculating filters: ' fazekasgy@37: print 'Nyquist: ',self.NqHz fazekasgy@37: fazekasgy@37: if self.maxHz > self.NqHz : fazekasgy@37: raise Exception('Maximum frequency must be smaller than the Nyquist frequency') fazekasgy@37: fazekasgy@37: self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) fazekasgy@37: self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) fazekasgy@37: print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) fazekasgy@37: self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) fazekasgy@37: self.DCTMatrix = self.getDCTMatrix(self.numBands) fazekasgy@37: self.filterIter = self.filterMatrix.__iter__() fazekasgy@37: self.valid = True fazekasgy@37: return self.valid fazekasgy@37: fazekasgy@37: # try : fazekasgy@37: # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) fazekasgy@37: # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) fazekasgy@37: # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) fazekasgy@37: # self.DCTMatrix = self.getDCTMatrix(self.numBands) fazekasgy@37: # self.filterIter = self.filterMatrix.__iter__() fazekasgy@37: # self.valid = True fazekasgy@37: # return True fazekasgy@37: # except : fazekasgy@37: # print "Invalid parameter setting encountered in MelScaling class." fazekasgy@37: # return False fazekasgy@37: # return True fazekasgy@37: fazekasgy@37: def getFilterCentres(self,inputSize,numBands): fazekasgy@37: '''Calculate Mel filter centres around FFT bins. fazekasgy@37: This function calculates two extra bands at the edges for fazekasgy@37: finding the starting and end point of the first and last fazekasgy@37: actual filters.''' fazekasgy@37: centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel fazekasgy@37: centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) fazekasgy@37: return numpy.array(centresBin,int) fazekasgy@37: fazekasgy@37: def getFilterMatrix(self,inputSize,numBands): fazekasgy@37: '''Compose the Mel scaling matrix.''' fazekasgy@37: filterMatrix = numpy.zeros((numBands,inputSize)) fazekasgy@37: self.filterCentres = self.getFilterCentres(inputSize,numBands) fazekasgy@37: for i in xrange(numBands) : fazekasgy@37: start,centre,end = self.filterCentres[i:i+3] fazekasgy@37: self.setFilter(filterMatrix[i],start,centre,end) fazekasgy@37: return filterMatrix.transpose() fazekasgy@37: fazekasgy@37: def setFilter(self,filt,filterStart,filterCentre,filterEnd): fazekasgy@37: '''Calculate a single Mel filter.''' fazekasgy@37: k1 = numpy.float32(filterCentre-filterStart) fazekasgy@37: k2 = numpy.float32(filterEnd-filterCentre) fazekasgy@37: up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 fazekasgy@37: dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 fazekasgy@37: filt[filterStart:filterCentre] = up fazekasgy@37: filt[filterCentre:filterEnd] = dn fazekasgy@37: fazekasgy@37: def warpSpectrum(self,magnitudeSpectrum): fazekasgy@37: '''Compute the Mel scaled spectrum.''' fazekasgy@37: return numpy.dot(magnitudeSpectrum,self.filterMatrix) fazekasgy@37: fazekasgy@37: def getDCTMatrix(self,size): fazekasgy@37: '''Calculate the square DCT transform matrix. Results are fazekasgy@37: equivalent to Matlab dctmtx(n) but with 64 bit precision.''' fazekasgy@37: DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) fazekasgy@37: DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size fazekasgy@37: DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) fazekasgy@37: DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) fazekasgy@37: return DCTmxT fazekasgy@37: fazekasgy@37: def dct(self,data_matrix): fazekasgy@37: '''Compute DCT of input matrix.''' fazekasgy@37: return numpy.dot(self.DCTMatrix,data_matrix) fazekasgy@37: fazekasgy@37: def getMFCCs(self,warpedSpectrum,cn=True): fazekasgy@37: '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' fazekasgy@37: mfccs=self.dct(numpy.log(warpedSpectrum)) fazekasgy@37: if cn is False : mfccs[0] = 0.0 fazekasgy@37: return mfccs fazekasgy@37: fazekasgy@37: fazekasgy@37: class PyMFCC_oldstyle(melScaling): fazekasgy@37: fazekasgy@37: def __init__(self,inputSampleRate): fazekasgy@37: self.vampy_flags = 1 # vf_DEBUG = 1 fazekasgy@37: self.m_inputSampleRate = inputSampleRate fazekasgy@37: self.m_stepSize = 1024 fazekasgy@37: self.m_blockSize = 2048 fazekasgy@37: self.m_channels = 1 fazekasgy@37: self.numBands = 40 fazekasgy@37: self.cnull = 1 fazekasgy@37: melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) fazekasgy@37: fazekasgy@37: def initialise(self,channels,stepSize,blockSize): fazekasgy@37: self.m_channels = channels fazekasgy@37: self.m_stepSize = stepSize fazekasgy@37: self.m_blockSize = blockSize fazekasgy@37: self.window = numpy.hamming(blockSize) fazekasgy@37: melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) fazekasgy@37: return True fazekasgy@37: fazekasgy@37: def getMaker(self): fazekasgy@37: return 'Vampy Test Plugins' fazekasgy@37: fazekasgy@37: def getCopyright(self): fazekasgy@37: return 'Plugin By George Fazekas' fazekasgy@37: fazekasgy@37: def getName(self): fazekasgy@37: return 'Vampy Old Style MFCC Plugin' fazekasgy@37: fazekasgy@37: def getIdentifier(self): fazekasgy@37: return 'vampy-mfcc-test-old' fazekasgy@37: fazekasgy@37: def getDescription(self): fazekasgy@37: return 'A simple MFCC plugin. (using the old syntax)' fazekasgy@37: fazekasgy@37: def getMaxChannelCount(self): fazekasgy@37: return 1 fazekasgy@37: fazekasgy@37: def getInputDomain(self): fazekasgy@37: return 'TimeDomain' fazekasgy@37: fazekasgy@37: def getPreferredBlockSize(self): fazekasgy@37: return 2048 fazekasgy@37: fazekasgy@37: def getPreferredStepSize(self): fazekasgy@37: return 512 fazekasgy@37: fazekasgy@37: def getOutputDescriptors(self): fazekasgy@37: fazekasgy@37: Generic={ fazekasgy@37: 'hasFixedBinCount':True, fazekasgy@37: 'binCount':int(self.numBands)-self.cnull, fazekasgy@37: 'hasKnownExtents':False, fazekasgy@37: 'isQuantized':True, fazekasgy@37: 'sampleType':'OneSamplePerStep' fazekasgy@37: } fazekasgy@37: fazekasgy@37: MFCC=Generic.copy() fazekasgy@37: MFCC.update({ fazekasgy@37: 'identifier':'mfccs', fazekasgy@37: 'name':'MFCCs', fazekasgy@37: 'description':'MFCC Coefficients', fazekasgy@37: 'binNames':map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))), fazekasgy@37: 'unit':'' fazekasgy@37: }) fazekasgy@37: fazekasgy@37: warpedSpectrum=Generic.copy() fazekasgy@37: warpedSpectrum.update({ fazekasgy@37: 'identifier':'warped-fft', fazekasgy@37: 'name':'Mel Scaled Spectrum', fazekasgy@37: 'description':'Mel Scaled Magnitide Spectrum', fazekasgy@37: 'unit':'Mel' fazekasgy@37: }) fazekasgy@37: fazekasgy@37: melFilter=Generic.copy() fazekasgy@37: melFilter.update({ fazekasgy@37: 'identifier':'mel-filter', fazekasgy@37: 'name':'Mel Filter Matrix', fazekasgy@37: 'description':'Returns the created filter matrix.', fazekasgy@37: 'sampleType':'FixedSampleRate', fazekasgy@37: 'sampleRate':self.m_inputSampleRate/self.m_stepSize, fazekasgy@37: 'unit':'' fazekasgy@37: }) fazekasgy@37: fazekasgy@37: return [MFCC,warpedSpectrum,melFilter] fazekasgy@37: fazekasgy@37: def getParameterDescriptors(self): fazekasgy@37: melbands = { fazekasgy@37: 'identifier':'melbands', fazekasgy@37: 'name':'Number of bands (coefficients)', fazekasgy@37: 'description':'Set the number of coefficients.', fazekasgy@37: 'unit':'', fazekasgy@37: 'minValue':2.0, fazekasgy@37: 'maxValue':128.0, fazekasgy@37: 'defaultValue':40.0, fazekasgy@37: 'isQuantized':True, fazekasgy@37: 'quantizeStep':1.0 fazekasgy@37: } fazekasgy@37: fazekasgy@37: cnull = { fazekasgy@37: 'identifier':'cnull', fazekasgy@37: 'name':'Return C0', fazekasgy@37: 'description':'Select if the DC coefficient is required.', fazekasgy@37: 'unit':'', fazekasgy@37: 'minValue':0.0, fazekasgy@37: 'maxValue':1.0, fazekasgy@37: 'defaultValue':0.0, fazekasgy@37: 'isQuantized':True, fazekasgy@37: 'quantizeStep':1.0 fazekasgy@37: } fazekasgy@37: fazekasgy@37: minHz = { fazekasgy@37: 'identifier':'minHz', fazekasgy@37: 'name':'minimum frequency', fazekasgy@37: 'description':'Set the lower frequency bound.', fazekasgy@37: 'unit':'Hz', fazekasgy@37: 'minValue':0.0, fazekasgy@37: 'maxValue':24000.0, fazekasgy@37: 'defaultValue':0.0, fazekasgy@37: 'isQuantized':True, fazekasgy@37: 'quantizeStep':1.0 fazekasgy@37: } fazekasgy@37: fazekasgy@37: maxHz = { fazekasgy@37: 'identifier':'maxHz', fazekasgy@37: 'name':'maximum frequency', fazekasgy@37: 'description':'Set the upper frequency bound.', fazekasgy@37: 'unit':'Hz', fazekasgy@37: 'minValue':100.0, fazekasgy@37: 'maxValue':24000.0, fazekasgy@37: 'defaultValue':11025.0, fazekasgy@37: 'isQuantized':True, fazekasgy@37: 'quantizeStep':100.0 fazekasgy@37: } fazekasgy@37: fazekasgy@37: return [melbands,minHz,maxHz,cnull] fazekasgy@37: fazekasgy@37: def setParameter(self,paramid,newval): fazekasgy@37: self.valid = False fazekasgy@37: if paramid == 'minHz' : fazekasgy@37: if newval < self.maxHz and newval < self.NqHz : fazekasgy@37: self.minHz = float(newval) fazekasgy@37: print 'minHz: ', self.minHz fazekasgy@37: if paramid == 'maxHz' : fazekasgy@37: print 'trying to set maxHz to: ',newval fazekasgy@37: if newval < self.NqHz and newval > self.minHz+1000 : fazekasgy@37: self.maxHz = float(newval) fazekasgy@37: else : fazekasgy@37: self.maxHz = self.NqHz fazekasgy@37: print 'set to: ',self.maxHz fazekasgy@37: if paramid == 'cnull' : fazekasgy@37: self.cnull = int(not int(newval)) fazekasgy@37: if paramid == 'melbands' : fazekasgy@37: self.numBands = int(newval) fazekasgy@37: return fazekasgy@37: fazekasgy@37: def getParameter(self,paramid): fazekasgy@37: if paramid == 'minHz' : fazekasgy@37: return float(self.minHz) fazekasgy@37: if paramid == 'maxHz' : fazekasgy@37: return float(self.maxHz) fazekasgy@37: if paramid == 'cnull' : fazekasgy@37: return float(not int(self.cnull)) fazekasgy@37: if paramid == 'melbands' : fazekasgy@37: return float(self.numBands) fazekasgy@37: else: fazekasgy@37: return 0.0 fazekasgy@37: fazekasgy@37: def processN(self,membuffer,frameSampleStart): fazekasgy@37: fazekasgy@37: # recalculate the filter and DCT matrices if needed fazekasgy@37: if not self.update() : return [] fazekasgy@37: fazekasgy@37: fftsize = self.m_blockSize fazekasgy@37: audioSamples = frombuffer(membuffer[0],float32) fazekasgy@37: fazekasgy@37: complexSpectrum = fft(self.window*audioSamples,fftsize) fazekasgy@37: #complexSpectrum = frombuffer(membuffer[0],complex64,-1,8) fazekasgy@37: fazekasgy@37: magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] / (fftsize/2) fazekasgy@37: melSpectrum = self.warpSpectrum(magnitudeSpectrum) fazekasgy@37: melCepstrum = self.getMFCCs(melSpectrum,cn=True) fazekasgy@37: fazekasgy@37: output_melCepstrum = [{ fazekasgy@37: 'hasTimestamp':False, fazekasgy@37: 'values':melCepstrum[self.cnull:].tolist() fazekasgy@37: }] fazekasgy@37: fazekasgy@37: output_melSpectrum = [{ fazekasgy@37: 'hasTimestamp':False, fazekasgy@37: 'values':melSpectrum.tolist() fazekasgy@37: }] fazekasgy@37: fazekasgy@37: return [output_melCepstrum,output_melSpectrum,[]] fazekasgy@37: fazekasgy@37: fazekasgy@37: def getRemainingFeatures(self): fazekasgy@37: if not self.update() : return [] fazekasgy@37: frameSampleStart = 0 fazekasgy@37: output_melFilter = [] fazekasgy@37: fazekasgy@37: while True: fazekasgy@37: try : fazekasgy@37: melFilter = self.filterIter.next() fazekasgy@37: output_melFilter.append({ fazekasgy@37: 'hasTimestamp':True, fazekasgy@37: 'timeStamp':frameSampleStart, fazekasgy@37: 'values':melFilter.tolist() fazekasgy@37: }) fazekasgy@37: frameSampleStart += self.m_stepSize fazekasgy@37: except StopIteration : fazekasgy@37: break; fazekasgy@37: fazekasgy@37: return [[],[],output_melFilter] fazekasgy@37: fazekasgy@37: fazekasgy@37: # ============================================ fazekasgy@37: # Simple Unit Tests fazekasgy@37: # ============================================ fazekasgy@37: fazekasgy@37: def main(): fazekasgy@37: fazekasgy@37: dct = melScaling(44100,2048,numBands=4) fazekasgy@37: dct.update() fazekasgy@37: print dct.DCTMatrix fazekasgy@37: # print dct.getMFCCs(numpy.array([0.0,0.1,0.0,-0.1],numpy.float64)) fazekasgy@37: sys.exit(-1) fazekasgy@37: fazekasgy@37: if __name__ == '__main__': fazekasgy@37: main() fazekasgy@37: