Mercurial > hg > vampy
view Example VamPy plugins/test/PyMFCC_time.py @ 37:27bab3a16c9a vampy2final
new branch Vampy2final
author | fazekasgy |
---|---|
date | Mon, 05 Oct 2009 11:28:00 +0000 |
parents | |
children |
line wrap: on
line source
'''PyMFCC_time.py - This example Vampy plugin demonstrates how to return sprectrogram-like features. This plugin has time domain input and is using the numpy array interface. Flag: vf_ARRAY Centre for Digital Music, Queen Mary University of London. Copyright 2006 Gyorgy Fazekas, QMUL. (See Vamp API for licence information.) Constants for Mel frequency conversion and filter centre calculation are taken from the GNU GPL licenced Freespeech library. Copyright (C) 1999 Jean-Marc Valin ''' import sys,numpy from numpy import log,exp,floor,sum from numpy import * from numpy.fft import * import vampy from vampy import * class melScaling(object): def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): '''Initialise frequency warping and DCT matrix. Parameters: sampleRate: audio sample rate inputSize: length of magnitude spectrum (half of FFT size assumed) numBands: number of mel Bands (MFCCs) minHz: lower bound of warping (default = DC) maxHz: higher bound of warping (default = Nyquist frequency) ''' self.sampleRate = sampleRate self.NqHz = sampleRate / 2.0 self.minHz = minHz if maxHz is None : maxHz = self.NqHz self.maxHz = maxHz self.inputSize = inputSize self.numBands = numBands self.valid = False self.updated = False def update(self): # make sure this will run only once if called from a vamp process if self.updated: return self.valid self.updated = True self.valid = False print 'Updating parameters and recalculating filters: ' print 'Nyquist: ',self.NqHz if self.maxHz > self.NqHz : raise Exception('Maximum frequency must be smaller than the Nyquist frequency') self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) self.DCTMatrix = self.getDCTMatrix(self.numBands) self.filterIter = self.filterMatrix.__iter__() self.valid = True return self.valid # try : # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) # self.DCTMatrix = self.getDCTMatrix(self.numBands) # self.filterIter = self.filterMatrix.__iter__() # self.valid = True # return True # except : # print "Invalid parameter setting encountered in MelScaling class." # return False # return True def getFilterCentres(self,inputSize,numBands): '''Calculate Mel filter centres around FFT bins. This function calculates two extra bands at the edges for finding the starting and end point of the first and last actual filters.''' centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) return numpy.array(centresBin,int) def getFilterMatrix(self,inputSize,numBands): '''Compose the Mel scaling matrix.''' filterMatrix = numpy.zeros((numBands,inputSize)) self.filterCentres = self.getFilterCentres(inputSize,numBands) for i in xrange(numBands) : start,centre,end = self.filterCentres[i:i+3] self.setFilter(filterMatrix[i],start,centre,end) return filterMatrix.transpose() def setFilter(self,filt,filterStart,filterCentre,filterEnd): '''Calculate a single Mel filter.''' k1 = numpy.float32(filterCentre-filterStart) k2 = numpy.float32(filterEnd-filterCentre) up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 filt[filterStart:filterCentre] = up filt[filterCentre:filterEnd] = dn def warpSpectrum(self,magnitudeSpectrum): '''Compute the Mel scaled spectrum.''' return numpy.dot(magnitudeSpectrum,self.filterMatrix) def getDCTMatrix(self,size): '''Calculate the square DCT transform matrix. Results are equivalent to Matlab dctmtx(n) but with 64 bit precision.''' DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) return DCTmxT def dct(self,data_matrix): '''Compute DCT of input matrix.''' return numpy.dot(self.DCTMatrix,data_matrix) def getMFCCs(self,warpedSpectrum,cn=True): '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' mfccs=self.dct(numpy.log(warpedSpectrum)) if cn is False : mfccs[0] = 0.0 return mfccs class PyMFCC_time(melScaling): def __init__(self,inputSampleRate): # flags for setting some Vampy options self.vampy_flags = vf_DEBUG | vf_ARRAY | vf_REALTIME self.m_inputSampleRate = int(inputSampleRate) self.m_stepSize = 512 self.m_blockSize = 2048 self.m_channels = 1 self.numBands = 40 self.cnull = 1 self.two_ch = False melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) def initialise(self,channels,stepSize,blockSize): self.m_channels = channels self.m_stepSize = stepSize self.m_blockSize = blockSize self.window = numpy.hamming(blockSize) melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) return True def getMaker(self): return 'Vampy Test Plugins' def getCopyright(self): return 'Plugin By George Fazekas' def getName(self): return 'Vampy TimeDomain MFCC Plugin' def getIdentifier(self): return 'vampy-mfcc-test-timedomain' def getDescription(self): return 'A simple MFCC plugin. (TimeDomain)' def getMaxChannelCount(self): return 2 def getInputDomain(self): return TimeDomain def getPreferredBlockSize(self): return 2048 def getPreferredStepSize(self): return 512 def getOutputDescriptors(self): Generic = OutputDescriptor() Generic.hasFixedBinCount=True Generic.binCount=int(self.numBands)-self.cnull Generic.hasKnownExtents=False Generic.isQuantized=True Generic.sampleType = OneSamplePerStep # note the inheritance of attributes (use is optional) MFCC = OutputDescriptor(Generic) MFCC.identifier = 'mfccs' MFCC.name = 'MFCCs' MFCC.description = 'MFCC Coefficients' MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))) MFCC.unit = None if self.two_ch and self.m_channels == 2 : MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull) else : MFCC.binCount = self.numBands-self.cnull warpedSpectrum = OutputDescriptor(Generic) warpedSpectrum.identifier='warped-fft' warpedSpectrum.name='Mel Scaled Spectrum' warpedSpectrum.description='Mel Scaled Magnitide Spectrum' warpedSpectrum.unit='Mel' if self.two_ch and self.m_channels == 2 : warpedSpectrum.binCount = self.m_channels * int(self.numBands) else : warpedSpectrum.binCount = self.numBands melFilter = OutputDescriptor(Generic) melFilter.identifier = 'mel-filter-matrix' melFilter.sampleType='FixedSampleRate' melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize melFilter.name='Mel Filter Matrix' melFilter.description='Returns the created filter matrix in getRemainingFeatures.' melFilter.unit = None return OutputList(MFCC,warpedSpectrum,melFilter) def getParameterDescriptors(self): melbands = ParameterDescriptor() melbands.identifier='melbands' melbands.name='Number of bands (coefficients)' melbands.description='Set the number of coefficients.' melbands.unit = '' melbands.minValue = 2 melbands.maxValue = 128 melbands.defaultValue = 40 melbands.isQuantized = True melbands.quantizeStep = 1 cnull = ParameterDescriptor() cnull.identifier='cnull' cnull.name='Return C0' cnull.description='Select if the DC coefficient is required.' cnull.unit = None cnull.minValue = 0 cnull.maxValue = 1 cnull.defaultValue = 0 cnull.isQuantized = True cnull.quantizeStep = 1 two_ch = ParameterDescriptor(cnull) two_ch.identifier='two_ch' two_ch.name='Process channels separately' two_ch.description='Process two channel files separately.' two_ch.defaultValue = False minHz = ParameterDescriptor() minHz.identifier='minHz' minHz.name='minimum frequency' minHz.description='Set the lower frequency bound.' minHz.unit='Hz' minHz.minValue = 0 minHz.maxValue = 24000 minHz.defaultValue = 0 minHz.isQuantized = True minHz.quantizeStep = 1.0 maxHz = ParameterDescriptor() maxHz.identifier='maxHz' maxHz.description='Set the upper frequency bound.' maxHz.name='maximum frequency' maxHz.unit='Hz' maxHz.minValue = 100 maxHz.maxValue = 24000 maxHz.defaultValue = 11025 maxHz.isQuantized = True maxHz.quantizeStep = 100 return ParameterList(melbands,minHz,maxHz,cnull,two_ch) def setParameter(self,paramid,newval): self.valid = False if paramid == 'minHz' : if newval < self.maxHz and newval < self.NqHz : self.minHz = float(newval) print 'minHz: ', self.minHz if paramid == 'maxHz' : print 'trying to set maxHz to: ',newval if newval < self.NqHz and newval > self.minHz+1000 : self.maxHz = float(newval) else : self.maxHz = self.NqHz print 'set to: ',self.maxHz if paramid == 'cnull' : self.cnull = int(not int(newval)) if paramid == 'melbands' : self.numBands = int(newval) if paramid == 'two_ch' : self.two_ch = bool(newval) return def getParameter(self,paramid): if paramid == 'minHz' : return float(self.minHz) if paramid == 'maxHz' : return float(self.maxHz) if paramid == 'cnull' : return float(not int(self.cnull)) if paramid == 'melbands' : return float(self.numBands) if paramid == 'two_ch' : return float(self.two_ch) else: return 0.0 # set numpy process using the 'use_numpy_interface' flag def process(self,inputbuffers,timestamp): if self.m_channels == 2 and self.two_ch : return self.process2ch(inputbuffers,timestamp) # calculate the filter and DCT matrices, check # if they are computable given a set of parameters # (we only do this once, when the process is called first) if not self.update() : return None fftsize = self.m_blockSize if self.m_channels > 1 : audioSamples = (inputbuffers[0]+inputbuffers[1])/2 else : audioSamples = inputbuffers[0] #complexSpectrum = frombuffer(membuffer[0],complex64,-1,8) complexSpectrum = fft(self.window*audioSamples,fftsize) magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] / (fftsize/2) # do the computation melSpectrum = self.warpSpectrum(magnitudeSpectrum) melCepstrum = self.getMFCCs(melSpectrum,cn=True) # output feature set (the builtin dict type can also be used) outputs = FeatureSet() outputs[0] = Feature(melCepstrum[self.cnull:]) outputs[1] = Feature(melSpectrum) return outputs # process two channel files (stack the returned arrays) def process2ch(self,inputbuffers,timestamp): if not self.update() : return None fftsize = self.m_blockSize audioSamples0 = inputbuffers[0] audioSamples1 = inputbuffers[1] complexSpectrum0 = fft(self.window*audioSamples0,fftsize) complexSpectrum1 = fft(self.window*audioSamples1,fftsize) magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] / (fftsize/2) magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] / (fftsize/2) # do the computation melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0) melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True) melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1) melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True) outputs = FeatureSet() outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:]))) outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0))) return outputs def getRemainingFeatures(self): if not self.update() : return [] frameSampleStart = 0 output_featureSet = FeatureSet() # the filter is the third output (index starts from zero) output_featureSet[2] = flist = FeatureList() while True: f = Feature() f.hasTimestamp = True f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate) try : f.values = self.filterIter.next() except StopIteration : break flist.append(f) frameSampleStart += self.m_stepSize return output_featureSet