fazekasgy@37: '''PyMFCC.py - This example Vampy plugin demonstrates fazekasgy@37: how to return sprectrogram-like features and how to return fazekasgy@37: data using the getRemainingFeatures() function. fazekasgy@37: fazekasgy@37: The plugin has frequency domain input and is using the fazekasgy@37: numpy array interface. (Flag: vf_ARRAY) fazekasgy@37: fazekasgy@37: Outputs: fazekasgy@37: 1) 2-128 MFCC coefficients fazekasgy@37: 2) Mel-warped spectrum used for the MFCC computation fazekasgy@37: 3) Filter matrix used for Mel scaling fazekasgy@37: fazekasgy@37: Centre for Digital Music, Queen Mary University of London. fazekasgy@37: Copyright (C) 2009 Gyorgy Fazekas, QMUL. (See Vamp sources fazekasgy@37: for licence information.) fazekasgy@37: fazekasgy@37: Constants for Mel frequency conversion and filter fazekasgy@37: centre calculation are taken from the GNU GPL licenced fazekasgy@37: Freespeech library. Copyright (C) 1999 Jean-Marc Valin fazekasgy@37: ''' fazekasgy@37: fazekasgy@37: import sys,numpy,vampy fazekasgy@37: from numpy import abs,log,exp,floor,sum,sqrt,cos,hstack fazekasgy@37: from numpy.fft import * fazekasgy@37: from vampy import * fazekasgy@37: fazekasgy@37: fazekasgy@37: class melScaling(object): fazekasgy@37: fazekasgy@37: def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): fazekasgy@37: '''Initialise frequency warping and DCT matrix. fazekasgy@37: Parameters: fazekasgy@37: sampleRate: audio sample rate fazekasgy@37: inputSize: length of magnitude spectrum (half of FFT size assumed) fazekasgy@37: numBands: number of mel Bands (MFCCs) fazekasgy@37: minHz: lower bound of warping (default = DC) fazekasgy@37: maxHz: higher bound of warping (default = Nyquist frequency) fazekasgy@37: ''' fazekasgy@37: self.sampleRate = sampleRate fazekasgy@37: self.NqHz = sampleRate / 2.0 fazekasgy@37: self.minHz = minHz fazekasgy@37: if maxHz is None : maxHz = self.NqHz fazekasgy@37: self.maxHz = maxHz fazekasgy@37: self.inputSize = inputSize fazekasgy@37: self.numBands = numBands fazekasgy@37: self.valid = False fazekasgy@37: self.updated = False fazekasgy@37: fazekasgy@37: def update(self): fazekasgy@37: # make sure this will run only once fazekasgy@37: # if called from a vamp process fazekasgy@37: if self.updated: return self.valid fazekasgy@37: self.updated = True fazekasgy@37: self.valid = False fazekasgy@37: print 'Updating parameters and recalculating filters: ' fazekasgy@37: print 'Nyquist: ',self.NqHz fazekasgy@37: fazekasgy@37: if self.maxHz > self.NqHz : fazekasgy@37: raise Exception('Maximum frequency must be smaller than the Nyquist frequency') fazekasgy@37: fazekasgy@37: self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) fazekasgy@37: self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) fazekasgy@37: print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' \ fazekasgy@37: %(self.minHz,self.maxHz,self.minMel,self.maxMel) fazekasgy@37: self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) fazekasgy@37: self.DCTMatrix = self.getDCTMatrix(self.numBands) fazekasgy@37: self.filterIter = self.filterMatrix.__iter__() fazekasgy@37: self.valid = True fazekasgy@37: return self.valid fazekasgy@37: fazekasgy@37: def getFilterCentres(self,inputSize,numBands): fazekasgy@37: '''Calculate Mel filter centres around FFT bins. fazekasgy@37: This function calculates two extra bands at the edges for fazekasgy@37: finding the starting and end point of the first and last fazekasgy@37: actual filters.''' fazekasgy@37: centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel fazekasgy@37: centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) fazekasgy@37: return numpy.array(centresBin,int) fazekasgy@37: fazekasgy@37: def getFilterMatrix(self,inputSize,numBands): fazekasgy@37: '''Compose the Mel scaling matrix.''' fazekasgy@37: filterMatrix = numpy.zeros((numBands,inputSize)) fazekasgy@37: self.filterCentres = self.getFilterCentres(inputSize,numBands) fazekasgy@37: for i in xrange(numBands) : fazekasgy@37: start,centre,end = self.filterCentres[i:i+3] fazekasgy@37: self.setFilter(filterMatrix[i],start,centre,end) fazekasgy@37: return filterMatrix.transpose() fazekasgy@37: fazekasgy@37: def setFilter(self,filt,filterStart,filterCentre,filterEnd): fazekasgy@37: '''Calculate a single Mel filter.''' fazekasgy@37: k1 = numpy.float32(filterCentre-filterStart) fazekasgy@37: k2 = numpy.float32(filterEnd-filterCentre) fazekasgy@37: up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 fazekasgy@37: dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 fazekasgy@37: filt[filterStart:filterCentre] = up fazekasgy@37: filt[filterCentre:filterEnd] = dn fazekasgy@37: fazekasgy@37: def warpSpectrum(self,magnitudeSpectrum): fazekasgy@37: '''Compute the Mel scaled spectrum.''' fazekasgy@37: return numpy.dot(magnitudeSpectrum,self.filterMatrix) fazekasgy@37: fazekasgy@37: def getDCTMatrix(self,size): fazekasgy@37: '''Calculate the square DCT transform matrix. Results are fazekasgy@37: equivalent to Matlab dctmtx(n) with 64 bit precision.''' fazekasgy@37: DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) fazekasgy@37: DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size fazekasgy@37: DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) fazekasgy@37: DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) fazekasgy@37: return DCTmxT fazekasgy@37: fazekasgy@37: def dct(self,data_matrix): fazekasgy@37: '''Compute DCT of input matrix.''' fazekasgy@37: return numpy.dot(self.DCTMatrix,data_matrix) fazekasgy@37: fazekasgy@37: def getMFCCs(self,warpedSpectrum,cn=True): fazekasgy@37: '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' fazekasgy@37: mfccs=self.dct(numpy.log(warpedSpectrum)) fazekasgy@37: if cn is False : mfccs[0] = 0.0 fazekasgy@37: return mfccs fazekasgy@37: fazekasgy@37: fazekasgy@37: class PyMFCC(melScaling): fazekasgy@37: fazekasgy@37: def __init__(self,inputSampleRate): fazekasgy@37: fazekasgy@37: # flags for setting some Vampy options fazekasgy@37: self.vampy_flags = vf_DEBUG | vf_ARRAY | vf_REALTIME fazekasgy@37: fazekasgy@37: self.m_inputSampleRate = int(inputSampleRate) fazekasgy@37: self.m_stepSize = 1024 fazekasgy@37: self.m_blockSize = 2048 fazekasgy@37: self.m_channels = 1 fazekasgy@37: self.numBands = 40 fazekasgy@37: self.cnull = 1 fazekasgy@37: self.two_ch = False fazekasgy@37: melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) fazekasgy@37: fazekasgy@37: def initialise(self,channels,stepSize,blockSize): fazekasgy@37: self.m_channels = channels fazekasgy@37: self.m_stepSize = stepSize fazekasgy@37: self.m_blockSize = blockSize fazekasgy@37: self.window = numpy.hamming(blockSize) fazekasgy@37: melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) fazekasgy@37: return True fazekasgy@37: fazekasgy@37: def getMaker(self): fazekasgy@37: return 'Vampy Example Plugins' fazekasgy@37: fazekasgy@37: def getCopyright(self): fazekasgy@37: return 'Plugin By George Fazekas' fazekasgy@37: fazekasgy@37: def getName(self): fazekasgy@37: return 'Vampy MFCC Plugin' fazekasgy@37: fazekasgy@37: def getIdentifier(self): fazekasgy@37: return 'vampy-mfcc' fazekasgy@37: fazekasgy@37: def getDescription(self): fazekasgy@37: return 'A simple MFCC plugin' fazekasgy@37: fazekasgy@37: def getMaxChannelCount(self): fazekasgy@37: return 2 fazekasgy@37: fazekasgy@37: def getInputDomain(self): fazekasgy@37: return FrequencyDomain #TimeDomain fazekasgy@37: fazekasgy@37: def getPreferredBlockSize(self): fazekasgy@37: return 2048 fazekasgy@37: fazekasgy@37: def getPreferredStepSize(self): fazekasgy@37: return 1024 fazekasgy@37: fazekasgy@37: def getOutputDescriptors(self): fazekasgy@37: fazekasgy@37: Generic = OutputDescriptor() fazekasgy@37: Generic.hasFixedBinCount=True fazekasgy@37: Generic.binCount=int(self.numBands)-self.cnull fazekasgy@37: Generic.hasKnownExtents=False fazekasgy@37: Generic.isQuantized=True fazekasgy@37: Generic.sampleType = OneSamplePerStep fazekasgy@37: fazekasgy@37: # note the inheritance of attributes (optional) fazekasgy@37: MFCC = OutputDescriptor(Generic) fazekasgy@37: MFCC.identifier = 'mfccs' fazekasgy@37: MFCC.name = 'MFCCs' fazekasgy@37: MFCC.description = 'MFCC Coefficients' fazekasgy@37: MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))) fazekasgy@37: if self.two_ch and self.m_channels == 2 : fazekasgy@37: MFCC.binNames *= 2 #repeat the list fazekasgy@37: MFCC.unit = None fazekasgy@37: if self.two_ch and self.m_channels == 2 : fazekasgy@37: MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull) fazekasgy@37: else : fazekasgy@37: MFCC.binCount = self.numBands-self.cnull fazekasgy@37: fazekasgy@37: warpedSpectrum = OutputDescriptor(Generic) fazekasgy@37: warpedSpectrum.identifier='warped-fft' fazekasgy@37: warpedSpectrum.name='Mel Scaled Spectrum' fazekasgy@37: warpedSpectrum.description='Mel Scaled Magnitide Spectrum' fazekasgy@37: warpedSpectrum.unit='Mel' fazekasgy@37: if self.two_ch and self.m_channels == 2 : fazekasgy@37: warpedSpectrum.binCount = self.m_channels * int(self.numBands) fazekasgy@37: else : fazekasgy@37: warpedSpectrum.binCount = self.numBands fazekasgy@37: fazekasgy@37: melFilter = OutputDescriptor(Generic) fazekasgy@37: melFilter.identifier = 'mel-filter-matrix' fazekasgy@37: melFilter.sampleType='FixedSampleRate' fazekasgy@37: melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize fazekasgy@37: melFilter.name='Mel Filter Matrix' fazekasgy@37: melFilter.description='Returns the created filter matrix in getRemainingFeatures.' fazekasgy@37: melFilter.unit = None fazekasgy@37: fazekasgy@37: return OutputList(MFCC,warpedSpectrum,melFilter) fazekasgy@37: fazekasgy@37: fazekasgy@37: def getParameterDescriptors(self): fazekasgy@37: fazekasgy@37: melbands = ParameterDescriptor() fazekasgy@37: melbands.identifier='melbands' fazekasgy@37: melbands.name='Number of bands (coefficients)' fazekasgy@37: melbands.description='Set the number of coefficients.' fazekasgy@37: melbands.unit = '' fazekasgy@37: melbands.minValue = 2 fazekasgy@37: melbands.maxValue = 128 fazekasgy@37: melbands.defaultValue = 40 fazekasgy@37: melbands.isQuantized = True fazekasgy@37: melbands.quantizeStep = 1 fazekasgy@37: fazekasgy@37: cnull = ParameterDescriptor() fazekasgy@37: cnull.identifier='cnull' fazekasgy@37: cnull.name='Return C0' fazekasgy@37: cnull.description='Select if the DC coefficient is required.' fazekasgy@37: cnull.unit = None fazekasgy@37: cnull.minValue = 0 fazekasgy@37: cnull.maxValue = 1 fazekasgy@37: cnull.defaultValue = 0 fazekasgy@37: cnull.isQuantized = True fazekasgy@37: cnull.quantizeStep = 1 fazekasgy@37: fazekasgy@37: two_ch = ParameterDescriptor(cnull) fazekasgy@37: two_ch.identifier='two_ch' fazekasgy@37: two_ch.name='Process channels separately' fazekasgy@37: two_ch.description='Process two channel files separately.' fazekasgy@37: two_ch.defaultValue = False fazekasgy@37: fazekasgy@37: minHz = ParameterDescriptor() fazekasgy@37: minHz.identifier='minHz' fazekasgy@37: minHz.name='minimum frequency' fazekasgy@37: minHz.description='Set the lower frequency bound.' fazekasgy@37: minHz.unit='Hz' fazekasgy@37: minHz.minValue = 0 fazekasgy@37: minHz.maxValue = 24000 fazekasgy@37: minHz.defaultValue = 0 fazekasgy@37: minHz.isQuantized = True fazekasgy@37: minHz.quantizeStep = 1.0 fazekasgy@37: fazekasgy@37: maxHz = ParameterDescriptor() fazekasgy@37: maxHz.identifier='maxHz' fazekasgy@37: maxHz.description='Set the upper frequency bound.' fazekasgy@37: maxHz.name='maximum frequency' fazekasgy@37: maxHz.unit='Hz' fazekasgy@37: maxHz.minValue = 100 fazekasgy@37: maxHz.maxValue = 24000 fazekasgy@37: maxHz.defaultValue = 11025 fazekasgy@37: maxHz.isQuantized = True fazekasgy@37: maxHz.quantizeStep = 100 fazekasgy@37: fazekasgy@37: return ParameterList(melbands,minHz,maxHz,cnull,two_ch) fazekasgy@37: fazekasgy@37: fazekasgy@37: def setParameter(self,paramid,newval): fazekasgy@37: self.valid = False fazekasgy@37: if paramid == 'minHz' : fazekasgy@37: if newval < self.maxHz and newval < self.NqHz : fazekasgy@37: self.minHz = float(newval) fazekasgy@37: if paramid == 'maxHz' : fazekasgy@37: if newval < self.NqHz and newval > self.minHz+1000 : fazekasgy@37: self.maxHz = float(newval) fazekasgy@37: else : fazekasgy@37: self.maxHz = self.NqHz fazekasgy@37: if paramid == 'cnull' : fazekasgy@37: self.cnull = int(not int(newval)) fazekasgy@37: if paramid == 'melbands' : fazekasgy@37: self.numBands = int(newval) fazekasgy@37: if paramid == 'two_ch' : fazekasgy@37: self.two_ch = bool(newval) fazekasgy@37: return None fazekasgy@37: fazekasgy@37: fazekasgy@37: def getParameter(self,paramid): fazekasgy@37: if paramid == 'minHz' : fazekasgy@37: return self.minHz fazekasgy@37: if paramid == 'maxHz' : fazekasgy@37: return self.maxHz fazekasgy@37: if paramid == 'cnull' : fazekasgy@37: return bool(not int(self.cnull)) fazekasgy@37: if paramid == 'melbands' : fazekasgy@37: return self.numBands fazekasgy@37: if paramid == 'two_ch' : fazekasgy@37: return self.two_ch fazekasgy@37: else: fazekasgy@37: return 0.0 fazekasgy@37: fazekasgy@37: # set numpy array process using the 'vf_ARRAY' flag in __init__() fazekasgy@37: # and RealTime time stamps using the 'vf_REALTIME' flag fazekasgy@37: def process(self,inputbuffers,timestamp): fazekasgy@37: fazekasgy@37: # calculate the filter and DCT matrices, check fazekasgy@37: # if they are computable given a set of parameters fazekasgy@37: # (we only do this once, when the process is called first) fazekasgy@37: if not self.update() : return None fazekasgy@37: fazekasgy@37: # if two channel processing is set, use process2ch fazekasgy@37: if self.m_channels == 2 and self.two_ch : fazekasgy@37: return self.process2ch(inputbuffers,timestamp) fazekasgy@37: fazekasgy@37: fftsize = self.m_blockSize fazekasgy@37: fazekasgy@37: if self.m_channels > 1 : fazekasgy@37: # take the average of two magnitude spectra fazekasgy@37: mS0 = abs(inputbuffers[0])[0:fftsize/2] fazekasgy@37: mS1 = abs(inputbuffers[1])[0:fftsize/2] fazekasgy@37: magnitudeSpectrum = (mS0 + mS1) / 2 fazekasgy@37: else : fazekasgy@37: complexSpectrum = inputbuffers[0] fazekasgy@37: magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] fazekasgy@37: fazekasgy@37: # do the frequency warping and MFCC computation fazekasgy@37: melSpectrum = self.warpSpectrum(magnitudeSpectrum) fazekasgy@37: melCepstrum = self.getMFCCs(melSpectrum,cn=True) fazekasgy@52: # print melSpectrum,melCepstrum fazekasgy@37: fazekasgy@37: # returning the values: fazekasgy@37: outputs = FeatureSet() fazekasgy@37: fazekasgy@37: # 1) full initialisation example using a FeatureList fazekasgy@37: f_mfccs = Feature() fazekasgy@37: f_mfccs.values = melCepstrum[self.cnull:] fazekasgy@37: outputs[0] = FeatureList(f_mfccs) fazekasgy@37: fazekasgy@37: # 2) simplified: when only one feature is required, fazekasgy@37: # the FeatureList() can be omitted fazekasgy@37: outputs[1] = Feature(melSpectrum) fazekasgy@37: fazekasgy@37: # this is equivalint to writing : fazekasgy@37: # outputs[1] = Feature() fazekasgy@37: # outputs[1].values = melSpectrum fazekasgy@37: # or using keyword args: Feature(values = melSpectrum) fazekasgy@37: fazekasgy@37: return outputs fazekasgy@37: fazekasgy@37: # process channels separately (stack the returned arrays) fazekasgy@37: def process2ch(self,inputbuffers,timestamp): fazekasgy@37: fazekasgy@37: fftsize = self.m_blockSize fazekasgy@37: fazekasgy@37: complexSpectrum0 = inputbuffers[0] fazekasgy@37: complexSpectrum1 = inputbuffers[1] fazekasgy@37: fazekasgy@37: magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] fazekasgy@37: magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] fazekasgy@37: fazekasgy@37: # do the computations fazekasgy@37: melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0) fazekasgy@37: melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True) fazekasgy@37: melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1) fazekasgy@37: melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True) fazekasgy@37: fazekasgy@37: outputs = FeatureSet() fazekasgy@37: outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:]))) fazekasgy@37: outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0))) fazekasgy@37: fazekasgy@37: return outputs fazekasgy@37: fazekasgy@37: fazekasgy@37: def getRemainingFeatures(self): fazekasgy@37: if not self.update() : return [] fazekasgy@37: frameSampleStart = 0 fazekasgy@37: fazekasgy@37: output_featureSet = FeatureSet() fazekasgy@37: fazekasgy@37: # the filter is the third output (index starts from zero) fazekasgy@37: output_featureSet[2] = flist = FeatureList() fazekasgy@37: fazekasgy@37: while True: fazekasgy@37: f = Feature() fazekasgy@37: f.hasTimestamp = True fazekasgy@37: f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate) fazekasgy@37: try : fazekasgy@37: f.values = self.filterIter.next() fazekasgy@37: except StopIteration : fazekasgy@37: break fazekasgy@37: flist.append(f) fazekasgy@37: frameSampleStart += self.m_stepSize fazekasgy@37: fazekasgy@37: return output_featureSet fazekasgy@37: