fazekasgy@52: '''PyMFCC_legacy.py - This example Vampy plugin demonstrates fazekasgy@52: how to return sprectrogram-like features. fazekasgy@52: fazekasgy@52: This plugin has frequency domain input and is using fazekasgy@52: the legacy input interface: the frequency samples are fazekasgy@52: passed as python list of complex numbers. fazekasgy@52: fazekasgy@52: Note: This is not the adviced way of writing plugins, fazekasgy@52: since the input interfaces provided for Numpy such as the fazekasgy@52: Numpy Array interface (flag: vf_ARRAY) are much faster. fazekasgy@52: fazekasgy@52: This plugin is using Numpy, but it does not rely on Vampy's fazekasgy@52: capability of passing Numpy arrays to the process directly. fazekasgy@52: However, it returns Numpy arrays from the process. fazekasgy@52: fazekasgy@52: Centre for Digital Music, Queen Mary University of London. fazekasgy@52: Copyright 2006 Gyorgy Fazekas, QMUL. fazekasgy@52: (See Vamp API for licence information.) fazekasgy@52: fazekasgy@52: Constants for Mel frequency conversion and filter fazekasgy@52: centre calculation are taken from the GNU GPL licenced fazekasgy@52: Freespeech library. Copyright (C) 1999 Jean-Marc Valin fazekasgy@52: ''' fazekasgy@52: fazekasgy@52: import sys,numpy fazekasgy@52: from numpy import log,exp,floor,sum fazekasgy@52: from numpy import * fazekasgy@52: from numpy.fft import * fazekasgy@52: import vampy fazekasgy@52: from vampy import * fazekasgy@52: fazekasgy@52: fazekasgy@52: class melScaling(object): fazekasgy@52: fazekasgy@52: def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): fazekasgy@52: '''Initialise frequency warping and DCT matrix. fazekasgy@52: Parameters: fazekasgy@52: sampleRate: audio sample rate fazekasgy@52: inputSize: length of magnitude spectrum (half of FFT size assumed) fazekasgy@52: numBands: number of mel Bands (MFCCs) fazekasgy@52: minHz: lower bound of warping (default = DC) fazekasgy@52: maxHz: higher bound of warping (default = Nyquist frequency) fazekasgy@52: ''' fazekasgy@52: self.sampleRate = sampleRate fazekasgy@52: self.NqHz = sampleRate / 2.0 fazekasgy@52: self.minHz = minHz fazekasgy@52: if maxHz is None : maxHz = self.NqHz fazekasgy@52: self.maxHz = maxHz fazekasgy@52: self.inputSize = inputSize fazekasgy@52: self.numBands = numBands fazekasgy@52: self.valid = False fazekasgy@52: self.updated = False fazekasgy@52: fazekasgy@52: fazekasgy@52: def update(self): fazekasgy@52: # make sure this will run only once if called from a vamp process fazekasgy@52: fazekasgy@52: if self.updated: return self.valid fazekasgy@52: self.updated = True fazekasgy@52: self.valid = False fazekasgy@52: print 'Updating parameters and recalculating filters: ' fazekasgy@52: print 'Nyquist: ',self.NqHz fazekasgy@52: fazekasgy@52: if self.maxHz > self.NqHz : fazekasgy@52: raise Exception('Maximum frequency must be smaller than the Nyquist frequency') fazekasgy@52: fazekasgy@52: self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) fazekasgy@52: self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) fazekasgy@52: print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) fazekasgy@52: self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) fazekasgy@52: self.DCTMatrix = self.getDCTMatrix(self.numBands) fazekasgy@52: self.filterIter = self.filterMatrix.__iter__() fazekasgy@52: self.valid = True fazekasgy@52: return self.valid fazekasgy@52: fazekasgy@52: # try : fazekasgy@52: # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) fazekasgy@52: # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) fazekasgy@52: # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) fazekasgy@52: # self.DCTMatrix = self.getDCTMatrix(self.numBands) fazekasgy@52: # self.filterIter = self.filterMatrix.__iter__() fazekasgy@52: # self.valid = True fazekasgy@52: # return True fazekasgy@52: # except : fazekasgy@52: # print "Invalid parameter setting encountered in MelScaling class." fazekasgy@52: # return False fazekasgy@52: # return True fazekasgy@52: fazekasgy@52: def getFilterCentres(self,inputSize,numBands): fazekasgy@52: '''Calculate Mel filter centres around FFT bins. fazekasgy@52: This function calculates two extra bands at the edges for fazekasgy@52: finding the starting and end point of the first and last fazekasgy@52: actual filters.''' fazekasgy@52: centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel fazekasgy@52: centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) fazekasgy@52: return numpy.array(centresBin,int) fazekasgy@52: fazekasgy@52: def getFilterMatrix(self,inputSize,numBands): fazekasgy@52: '''Compose the Mel scaling matrix.''' fazekasgy@52: filterMatrix = numpy.zeros((numBands,inputSize)) fazekasgy@52: self.filterCentres = self.getFilterCentres(inputSize,numBands) fazekasgy@52: for i in xrange(numBands) : fazekasgy@52: start,centre,end = self.filterCentres[i:i+3] fazekasgy@52: self.setFilter(filterMatrix[i],start,centre,end) fazekasgy@52: return filterMatrix.transpose() fazekasgy@52: fazekasgy@52: def setFilter(self,filt,filterStart,filterCentre,filterEnd): fazekasgy@52: '''Calculate a single Mel filter.''' fazekasgy@52: k1 = numpy.float32(filterCentre-filterStart) fazekasgy@52: k2 = numpy.float32(filterEnd-filterCentre) fazekasgy@52: up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 fazekasgy@52: dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 fazekasgy@52: filt[filterStart:filterCentre] = up fazekasgy@52: filt[filterCentre:filterEnd] = dn fazekasgy@52: fazekasgy@52: def warpSpectrum(self,magnitudeSpectrum): fazekasgy@52: '''Compute the Mel scaled spectrum.''' fazekasgy@52: return numpy.dot(magnitudeSpectrum,self.filterMatrix) fazekasgy@52: fazekasgy@52: def getDCTMatrix(self,size): fazekasgy@52: '''Calculate the square DCT transform matrix. Results are fazekasgy@52: equivalent to Matlab dctmtx(n) but with 64 bit precision.''' fazekasgy@52: DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) fazekasgy@52: DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size fazekasgy@52: DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) fazekasgy@52: DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) fazekasgy@52: return DCTmxT fazekasgy@52: fazekasgy@52: def dct(self,data_matrix): fazekasgy@52: '''Compute DCT of input matrix.''' fazekasgy@52: return numpy.dot(self.DCTMatrix,data_matrix) fazekasgy@52: fazekasgy@52: def getMFCCs(self,warpedSpectrum,cn=True): fazekasgy@52: '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' fazekasgy@52: mfccs=self.dct(numpy.log(warpedSpectrum)) fazekasgy@52: if cn is False : mfccs[0] = 0.0 fazekasgy@52: return mfccs fazekasgy@52: fazekasgy@52: fazekasgy@52: class PyMFCC_legacy(melScaling): fazekasgy@52: fazekasgy@52: def __init__(self,inputSampleRate): fazekasgy@52: fazekasgy@52: # flags for setting some Vampy options fazekasgy@52: self.vampy_flags = vf_DEBUG | vf_REALTIME fazekasgy@52: fazekasgy@52: self.m_inputSampleRate = int(inputSampleRate) fazekasgy@52: self.m_stepSize = 512 fazekasgy@52: self.m_blockSize = 2048 fazekasgy@52: self.m_channels = 1 fazekasgy@52: self.numBands = 40 fazekasgy@52: self.cnull = 1 fazekasgy@52: self.two_ch = False fazekasgy@52: melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) fazekasgy@52: fazekasgy@52: def initialise(self,channels,stepSize,blockSize): fazekasgy@52: self.m_channels = channels fazekasgy@52: self.m_stepSize = stepSize fazekasgy@52: self.m_blockSize = blockSize fazekasgy@52: self.window = numpy.hamming(blockSize) fazekasgy@52: melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) fazekasgy@52: return True fazekasgy@52: fazekasgy@52: def getMaker(self): fazekasgy@52: return 'Vampy Test Plugins' fazekasgy@52: fazekasgy@52: def getCopyright(self): fazekasgy@52: return 'Plugin By George Fazekas' fazekasgy@52: fazekasgy@52: def getName(self): fazekasgy@52: return 'Vampy Legacy FrequencyDomain MFCC Plugin' fazekasgy@52: fazekasgy@52: def getIdentifier(self): fazekasgy@52: return 'vampy-mfcc-test-legacy' fazekasgy@52: fazekasgy@52: def getDescription(self): fazekasgy@52: return 'Vampy FrequencyDomain MFCC Plugin using the Legacy interface.' fazekasgy@52: fazekasgy@52: def getMaxChannelCount(self): fazekasgy@52: return 2 fazekasgy@52: fazekasgy@52: def getInputDomain(self): fazekasgy@52: return FrequencyDomain fazekasgy@52: fazekasgy@52: def getPreferredBlockSize(self): fazekasgy@52: return 2048 fazekasgy@52: fazekasgy@52: def getPreferredStepSize(self): fazekasgy@52: return 512 fazekasgy@52: fazekasgy@52: def getOutputDescriptors(self): fazekasgy@52: fazekasgy@52: Generic = OutputDescriptor() fazekasgy@52: Generic.hasFixedBinCount=True fazekasgy@52: Generic.binCount=int(self.numBands)-self.cnull fazekasgy@52: Generic.hasKnownExtents=False fazekasgy@52: Generic.isQuantized=True fazekasgy@52: Generic.sampleType = OneSamplePerStep fazekasgy@52: fazekasgy@52: # note the inheritance of attributes (use is optional) fazekasgy@52: MFCC = OutputDescriptor(Generic) fazekasgy@52: MFCC.identifier = 'mfccs' fazekasgy@52: MFCC.name = 'MFCCs' fazekasgy@52: MFCC.description = 'MFCC Coefficients' fazekasgy@52: MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))) fazekasgy@52: MFCC.unit = None fazekasgy@52: if self.two_ch and self.m_channels == 2 : fazekasgy@52: MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull) fazekasgy@52: else : fazekasgy@52: MFCC.binCount = self.numBands-self.cnull fazekasgy@52: fazekasgy@52: warpedSpectrum = OutputDescriptor(Generic) fazekasgy@52: warpedSpectrum.identifier='warped-fft' fazekasgy@52: warpedSpectrum.name='Mel Scaled Spectrum' fazekasgy@52: warpedSpectrum.description='Mel Scaled Magnitide Spectrum' fazekasgy@52: warpedSpectrum.unit='Mel' fazekasgy@52: if self.two_ch and self.m_channels == 2 : fazekasgy@52: warpedSpectrum.binCount = self.m_channels * int(self.numBands) fazekasgy@52: else : fazekasgy@52: warpedSpectrum.binCount = self.numBands fazekasgy@52: fazekasgy@52: melFilter = OutputDescriptor(Generic) fazekasgy@52: melFilter.identifier = 'mel-filter-matrix' fazekasgy@52: melFilter.sampleType='FixedSampleRate' fazekasgy@52: melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize fazekasgy@52: melFilter.name='Mel Filter Matrix' fazekasgy@52: melFilter.description='Returns the created filter matrix in getRemainingFeatures.' fazekasgy@52: melFilter.unit = None fazekasgy@52: fazekasgy@52: return OutputList(MFCC,warpedSpectrum,melFilter) fazekasgy@52: fazekasgy@52: fazekasgy@52: def getParameterDescriptors(self): fazekasgy@52: fazekasgy@52: melbands = ParameterDescriptor() fazekasgy@52: melbands.identifier='melbands' fazekasgy@52: melbands.name='Number of bands (coefficients)' fazekasgy@52: melbands.description='Set the number of coefficients.' fazekasgy@52: melbands.unit = '' fazekasgy@52: melbands.minValue = 2 fazekasgy@52: melbands.maxValue = 128 fazekasgy@52: melbands.defaultValue = 40 fazekasgy@52: melbands.isQuantized = True fazekasgy@52: melbands.quantizeStep = 1 fazekasgy@52: fazekasgy@52: cnull = ParameterDescriptor() fazekasgy@52: cnull.identifier='cnull' fazekasgy@52: cnull.name='Return C0' fazekasgy@52: cnull.description='Select if the DC coefficient is required.' fazekasgy@52: cnull.unit = None fazekasgy@52: cnull.minValue = 0 fazekasgy@52: cnull.maxValue = 1 fazekasgy@52: cnull.defaultValue = 0 fazekasgy@52: cnull.isQuantized = True fazekasgy@52: cnull.quantizeStep = 1 fazekasgy@52: fazekasgy@52: two_ch = ParameterDescriptor(cnull) fazekasgy@52: two_ch.identifier='two_ch' fazekasgy@52: two_ch.name='Process channels separately' fazekasgy@52: two_ch.description='Process two channel files separately.' fazekasgy@52: two_ch.defaultValue = False fazekasgy@52: fazekasgy@52: minHz = ParameterDescriptor() fazekasgy@52: minHz.identifier='minHz' fazekasgy@52: minHz.name='minimum frequency' fazekasgy@52: minHz.description='Set the lower frequency bound.' fazekasgy@52: minHz.unit='Hz' fazekasgy@52: minHz.minValue = 0 fazekasgy@52: minHz.maxValue = 24000 fazekasgy@52: minHz.defaultValue = 0 fazekasgy@52: minHz.isQuantized = True fazekasgy@52: minHz.quantizeStep = 1.0 fazekasgy@52: fazekasgy@52: maxHz = ParameterDescriptor() fazekasgy@52: maxHz.identifier='maxHz' fazekasgy@52: maxHz.description='Set the upper frequency bound.' fazekasgy@52: maxHz.name='maximum frequency' fazekasgy@52: maxHz.unit='Hz' fazekasgy@52: maxHz.minValue = 100 fazekasgy@52: maxHz.maxValue = 24000 fazekasgy@52: maxHz.defaultValue = 11025 fazekasgy@52: maxHz.isQuantized = True fazekasgy@52: maxHz.quantizeStep = 100 fazekasgy@52: fazekasgy@52: return ParameterList(melbands,minHz,maxHz,cnull,two_ch) fazekasgy@52: fazekasgy@52: fazekasgy@52: def setParameter(self,paramid,newval): fazekasgy@52: self.valid = False fazekasgy@52: if paramid == 'minHz' : fazekasgy@52: if newval < self.maxHz and newval < self.NqHz : fazekasgy@52: self.minHz = float(newval) fazekasgy@52: print 'minHz: ', self.minHz fazekasgy@52: if paramid == 'maxHz' : fazekasgy@52: print 'trying to set maxHz to: ',newval fazekasgy@52: if newval < self.NqHz and newval > self.minHz+1000 : fazekasgy@52: self.maxHz = float(newval) fazekasgy@52: else : fazekasgy@52: self.maxHz = self.NqHz fazekasgy@52: print 'set to: ',self.maxHz fazekasgy@52: if paramid == 'cnull' : fazekasgy@52: self.cnull = int(not int(newval)) fazekasgy@52: if paramid == 'melbands' : fazekasgy@52: self.numBands = int(newval) fazekasgy@52: if paramid == 'two_ch' : fazekasgy@52: self.two_ch = bool(newval) fazekasgy@52: fazekasgy@52: return fazekasgy@52: fazekasgy@52: def getParameter(self,paramid): fazekasgy@52: if paramid == 'minHz' : fazekasgy@52: return float(self.minHz) fazekasgy@52: if paramid == 'maxHz' : fazekasgy@52: return float(self.maxHz) fazekasgy@52: if paramid == 'cnull' : fazekasgy@52: return float(not int(self.cnull)) fazekasgy@52: if paramid == 'melbands' : fazekasgy@52: return float(self.numBands) fazekasgy@52: if paramid == 'two_ch' : fazekasgy@52: return float(self.two_ch) fazekasgy@52: else: fazekasgy@52: return 0.0 fazekasgy@52: fazekasgy@52: # set numpy process using the 'use_numpy_interface' flag fazekasgy@52: def process(self,inputbuffers,timestamp): fazekasgy@52: fazekasgy@52: if not self.update() : return None fazekasgy@52: fazekasgy@52: if self.m_channels == 2 and self.two_ch : fazekasgy@52: return self.process2ch(inputbuffers,timestamp) fazekasgy@52: fazekasgy@52: fftsize = self.m_blockSize fazekasgy@52: fazekasgy@52: if self.m_channels > 1 : fazekasgy@52: # take the mean of the two magnitude spectra fazekasgy@52: complexSpectrum0 = array(inputbuffers[0]) fazekasgy@52: complexSpectrum1 = array(inputbuffers[1]) fazekasgy@52: magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] fazekasgy@52: magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] fazekasgy@52: magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2 fazekasgy@52: else : fazekasgy@52: complexSpectrum = array(inputbuffers[0]) fazekasgy@52: magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] fazekasgy@52: fazekasgy@52: # do the computation fazekasgy@52: melSpectrum = self.warpSpectrum(magnitudeSpectrum) fazekasgy@52: melCepstrum = self.getMFCCs(melSpectrum,cn=True) fazekasgy@52: fazekasgy@52: outputs = FeatureSet() fazekasgy@52: outputs[0] = Feature(melCepstrum[self.cnull:]) fazekasgy@52: outputs[1] = Feature(melSpectrum) fazekasgy@52: return outputs fazekasgy@52: fazekasgy@52: fazekasgy@52: # process channels separately (stack the returned arrays) fazekasgy@52: def process2ch(self,inputbuffers,timestamp): fazekasgy@52: fazekasgy@52: fftsize = self.m_blockSize fazekasgy@52: fazekasgy@52: complexSpectrum0 = array(inputbuffers[0]) fazekasgy@52: complexSpectrum1 = array(inputbuffers[1]) fazekasgy@52: fazekasgy@52: magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] fazekasgy@52: magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] fazekasgy@52: fazekasgy@52: # do the computations fazekasgy@52: melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0) fazekasgy@52: melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True) fazekasgy@52: melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1) fazekasgy@52: melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True) fazekasgy@52: fazekasgy@52: outputs = FeatureSet() fazekasgy@52: fazekasgy@52: outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:]))) fazekasgy@52: fazekasgy@52: outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0))) fazekasgy@52: fazekasgy@52: return outputs fazekasgy@52: fazekasgy@52: fazekasgy@52: def getRemainingFeatures(self): fazekasgy@52: if not self.update() : return [] fazekasgy@52: frameSampleStart = 0 fazekasgy@52: fazekasgy@52: output_featureSet = FeatureSet() fazekasgy@52: fazekasgy@52: # the filter is the third output (index starts from zero) fazekasgy@52: output_featureSet[2] = flist = FeatureList() fazekasgy@52: fazekasgy@52: while True: fazekasgy@52: f = Feature() fazekasgy@52: f.hasTimestamp = True fazekasgy@52: f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate) fazekasgy@52: try : fazekasgy@52: f.values = self.filterIter.next() fazekasgy@52: except StopIteration : fazekasgy@52: break fazekasgy@52: flist.append(f) fazekasgy@52: frameSampleStart += self.m_stepSize fazekasgy@52: fazekasgy@52: return output_featureSet