Mercurial > hg > vampy
diff Example VamPy plugins/test/PyMFCC_oldstyle.py @ 37:27bab3a16c9a vampy2final
new branch Vampy2final
author | fazekasgy |
---|---|
date | Mon, 05 Oct 2009 11:28:00 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Example VamPy plugins/test/PyMFCC_oldstyle.py Mon Oct 05 11:28:00 2009 +0000 @@ -0,0 +1,361 @@ +'''PyMFCC_oldstyle.py - This example Vampy plugin demonstrates +how to return sprectrogram-like features. + +This plugin uses backward compatible syntax and +no extension module. + +Centre for Digital Music, Queen Mary University of London. +Copyright 2006 Gyorgy Fazekas, QMUL. +(See Vamp API for licence information.) + +Constants for Mel frequency conversion and filter +centre calculation are taken from the GNU GPL licenced +Freespeech library. Copyright (C) 1999 Jean-Marc Valin +''' + +import sys,numpy +from numpy import log,exp,floor,sum +from numpy import * +from numpy.fft import * + + +class melScaling(object): + + def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): + '''Initialise frequency warping and DCT matrix. + Parameters: + sampleRate: audio sample rate + inputSize: length of magnitude spectrum (half of FFT size assumed) + numBands: number of mel Bands (MFCCs) + minHz: lower bound of warping (default = DC) + maxHz: higher bound of warping (default = Nyquist frequency) + ''' + self.sampleRate = sampleRate + self.NqHz = sampleRate / 2.0 + self.minHz = minHz + if maxHz is None : maxHz = self.NqHz + self.maxHz = maxHz + self.inputSize = inputSize + self.numBands = numBands + self.valid = False + self.updated = False + # print '\n\n>>Plugin initialised with sample rate: %s<<\n\n' %self.sampleRate + # print 'minHz:%s\nmaxHz:%s\n' %(self.minHz,self.maxHz) + + + def update(self): + # make sure this will run only once if called from a vamp process + + if self.updated: return self.valid + self.updated = True + self.valid = False + print 'Updating parameters and recalculating filters: ' + print 'Nyquist: ',self.NqHz + + if self.maxHz > self.NqHz : + raise Exception('Maximum frequency must be smaller than the Nyquist frequency') + + self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) + self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) + print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) + self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) + self.DCTMatrix = self.getDCTMatrix(self.numBands) + self.filterIter = self.filterMatrix.__iter__() + self.valid = True + return self.valid + + # try : + # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) + # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) + # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) + # self.DCTMatrix = self.getDCTMatrix(self.numBands) + # self.filterIter = self.filterMatrix.__iter__() + # self.valid = True + # return True + # except : + # print "Invalid parameter setting encountered in MelScaling class." + # return False + # return True + + def getFilterCentres(self,inputSize,numBands): + '''Calculate Mel filter centres around FFT bins. + This function calculates two extra bands at the edges for + finding the starting and end point of the first and last + actual filters.''' + centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel + centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) + return numpy.array(centresBin,int) + + def getFilterMatrix(self,inputSize,numBands): + '''Compose the Mel scaling matrix.''' + filterMatrix = numpy.zeros((numBands,inputSize)) + self.filterCentres = self.getFilterCentres(inputSize,numBands) + for i in xrange(numBands) : + start,centre,end = self.filterCentres[i:i+3] + self.setFilter(filterMatrix[i],start,centre,end) + return filterMatrix.transpose() + + def setFilter(self,filt,filterStart,filterCentre,filterEnd): + '''Calculate a single Mel filter.''' + k1 = numpy.float32(filterCentre-filterStart) + k2 = numpy.float32(filterEnd-filterCentre) + up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 + dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 + filt[filterStart:filterCentre] = up + filt[filterCentre:filterEnd] = dn + + def warpSpectrum(self,magnitudeSpectrum): + '''Compute the Mel scaled spectrum.''' + return numpy.dot(magnitudeSpectrum,self.filterMatrix) + + def getDCTMatrix(self,size): + '''Calculate the square DCT transform matrix. Results are + equivalent to Matlab dctmtx(n) but with 64 bit precision.''' + DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) + DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size + DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) + DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) + return DCTmxT + + def dct(self,data_matrix): + '''Compute DCT of input matrix.''' + return numpy.dot(self.DCTMatrix,data_matrix) + + def getMFCCs(self,warpedSpectrum,cn=True): + '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' + mfccs=self.dct(numpy.log(warpedSpectrum)) + if cn is False : mfccs[0] = 0.0 + return mfccs + + +class PyMFCC_oldstyle(melScaling): + + def __init__(self,inputSampleRate): + self.vampy_flags = 1 # vf_DEBUG = 1 + self.m_inputSampleRate = inputSampleRate + self.m_stepSize = 1024 + self.m_blockSize = 2048 + self.m_channels = 1 + self.numBands = 40 + self.cnull = 1 + melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) + + def initialise(self,channels,stepSize,blockSize): + self.m_channels = channels + self.m_stepSize = stepSize + self.m_blockSize = blockSize + self.window = numpy.hamming(blockSize) + melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) + return True + + def getMaker(self): + return 'Vampy Test Plugins' + + def getCopyright(self): + return 'Plugin By George Fazekas' + + def getName(self): + return 'Vampy Old Style MFCC Plugin' + + def getIdentifier(self): + return 'vampy-mfcc-test-old' + + def getDescription(self): + return 'A simple MFCC plugin. (using the old syntax)' + + def getMaxChannelCount(self): + return 1 + + def getInputDomain(self): + return 'TimeDomain' + + def getPreferredBlockSize(self): + return 2048 + + def getPreferredStepSize(self): + return 512 + + def getOutputDescriptors(self): + + Generic={ + 'hasFixedBinCount':True, + 'binCount':int(self.numBands)-self.cnull, + 'hasKnownExtents':False, + 'isQuantized':True, + 'sampleType':'OneSamplePerStep' + } + + MFCC=Generic.copy() + MFCC.update({ + 'identifier':'mfccs', + 'name':'MFCCs', + 'description':'MFCC Coefficients', + 'binNames':map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))), + 'unit':'' + }) + + warpedSpectrum=Generic.copy() + warpedSpectrum.update({ + 'identifier':'warped-fft', + 'name':'Mel Scaled Spectrum', + 'description':'Mel Scaled Magnitide Spectrum', + 'unit':'Mel' + }) + + melFilter=Generic.copy() + melFilter.update({ + 'identifier':'mel-filter', + 'name':'Mel Filter Matrix', + 'description':'Returns the created filter matrix.', + 'sampleType':'FixedSampleRate', + 'sampleRate':self.m_inputSampleRate/self.m_stepSize, + 'unit':'' + }) + + return [MFCC,warpedSpectrum,melFilter] + + def getParameterDescriptors(self): + melbands = { + 'identifier':'melbands', + 'name':'Number of bands (coefficients)', + 'description':'Set the number of coefficients.', + 'unit':'', + 'minValue':2.0, + 'maxValue':128.0, + 'defaultValue':40.0, + 'isQuantized':True, + 'quantizeStep':1.0 + } + + cnull = { + 'identifier':'cnull', + 'name':'Return C0', + 'description':'Select if the DC coefficient is required.', + 'unit':'', + 'minValue':0.0, + 'maxValue':1.0, + 'defaultValue':0.0, + 'isQuantized':True, + 'quantizeStep':1.0 + } + + minHz = { + 'identifier':'minHz', + 'name':'minimum frequency', + 'description':'Set the lower frequency bound.', + 'unit':'Hz', + 'minValue':0.0, + 'maxValue':24000.0, + 'defaultValue':0.0, + 'isQuantized':True, + 'quantizeStep':1.0 + } + + maxHz = { + 'identifier':'maxHz', + 'name':'maximum frequency', + 'description':'Set the upper frequency bound.', + 'unit':'Hz', + 'minValue':100.0, + 'maxValue':24000.0, + 'defaultValue':11025.0, + 'isQuantized':True, + 'quantizeStep':100.0 + } + + return [melbands,minHz,maxHz,cnull] + + def setParameter(self,paramid,newval): + self.valid = False + if paramid == 'minHz' : + if newval < self.maxHz and newval < self.NqHz : + self.minHz = float(newval) + print 'minHz: ', self.minHz + if paramid == 'maxHz' : + print 'trying to set maxHz to: ',newval + if newval < self.NqHz and newval > self.minHz+1000 : + self.maxHz = float(newval) + else : + self.maxHz = self.NqHz + print 'set to: ',self.maxHz + if paramid == 'cnull' : + self.cnull = int(not int(newval)) + if paramid == 'melbands' : + self.numBands = int(newval) + return + + def getParameter(self,paramid): + if paramid == 'minHz' : + return float(self.minHz) + if paramid == 'maxHz' : + return float(self.maxHz) + if paramid == 'cnull' : + return float(not int(self.cnull)) + if paramid == 'melbands' : + return float(self.numBands) + else: + return 0.0 + + def processN(self,membuffer,frameSampleStart): + + # recalculate the filter and DCT matrices if needed + if not self.update() : return [] + + fftsize = self.m_blockSize + audioSamples = frombuffer(membuffer[0],float32) + + complexSpectrum = fft(self.window*audioSamples,fftsize) + #complexSpectrum = frombuffer(membuffer[0],complex64,-1,8) + + magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] / (fftsize/2) + melSpectrum = self.warpSpectrum(magnitudeSpectrum) + melCepstrum = self.getMFCCs(melSpectrum,cn=True) + + output_melCepstrum = [{ + 'hasTimestamp':False, + 'values':melCepstrum[self.cnull:].tolist() + }] + + output_melSpectrum = [{ + 'hasTimestamp':False, + 'values':melSpectrum.tolist() + }] + + return [output_melCepstrum,output_melSpectrum,[]] + + + def getRemainingFeatures(self): + if not self.update() : return [] + frameSampleStart = 0 + output_melFilter = [] + + while True: + try : + melFilter = self.filterIter.next() + output_melFilter.append({ + 'hasTimestamp':True, + 'timeStamp':frameSampleStart, + 'values':melFilter.tolist() + }) + frameSampleStart += self.m_stepSize + except StopIteration : + break; + + return [[],[],output_melFilter] + + +# ============================================ +# Simple Unit Tests +# ============================================ + +def main(): + + dct = melScaling(44100,2048,numBands=4) + dct.update() + print dct.DCTMatrix + # print dct.getMFCCs(numpy.array([0.0,0.1,0.0,-0.1],numpy.float64)) + sys.exit(-1) + +if __name__ == '__main__': + main() +