Mercurial > hg > vampy
changeset 52:d56f48aafb99
Updated some example plugins.
author | fazekasgy |
---|---|
date | Thu, 08 Oct 2009 08:59:08 +0000 |
parents | c1e4f706ca9a |
children | 7e59caea821b |
files | Example VamPy plugins/PyMFCC.py Example VamPy plugins/PySpectralCentroid.py Example VamPy plugins/test/PyMFCC_legacy.py |
diffstat | 3 files changed, 546 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/Example VamPy plugins/PyMFCC.py Thu Oct 08 08:47:28 2009 +0000 +++ b/Example VamPy plugins/PyMFCC.py Thu Oct 08 08:59:08 2009 +0000 @@ -329,6 +329,7 @@ # do the frequency warping and MFCC computation melSpectrum = self.warpSpectrum(magnitudeSpectrum) melCepstrum = self.getMFCCs(melSpectrum,cn=True) + # print melSpectrum,melCepstrum # returning the values: outputs = FeatureSet()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Example VamPy plugins/PySpectralCentroid.py Thu Oct 08 08:59:08 2009 +0000 @@ -0,0 +1,144 @@ +'''PySpectralCentroid.py - Example plugin demonstrates +how to write a C style plugin using VamPy in pure Python. +This plugin also introduces the use of the builtin vampy +extension module. + +The plugin has frequency domain input and is using the +legacy interface: the FFT outpout is passed as a list +of complex numbers. + +Outputs: +1) Spectral centroid + +Note: This is not the adviced way of writing Vampy plugins now, +since the interfaces provided for Numpy are at least 5 times +faster. However, this is still a nice and easy to understand +example, which also shows how can one write a reasonable +plugin without having Numpy installed. + +Warning: Earlier versions of this plugin are now obsolete. +(They were using the legacy interface of Vampy 1 which +did not distinquish between time and frequency domain inputs.) + +Centre for Digital Music, Queen Mary University of London. +Copyright (C) 2009 Gyorgy Fazekas, QMUL. (See Vamp sources +for licence information.) + +''' + +# import the names we use from vampy +from vampy import Feature,FeatureSet,ParameterDescriptor +from vampy import OutputDescriptor,FrequencyDomain,OneSamplePerStep + +from math import sqrt + +class PySpectralCentroid: + + def __init__(self,inputSampleRate): + self.m_imputSampleRate = 0.0 + self.m_stepSize = 0 + self.m_blockSize = 0 + self.m_channels = 0 + self.previousSample = 0.0 + self.m_inputSampleRate = inputSampleRate + self.threshold = 0.00 + + def initialise(self,channels,stepSize,blockSize): + self.m_channels = channels + self.m_stepSize = stepSize + self.m_blockSize = blockSize + return True + + def getMaker(self): + return 'Vampy Example Plugins' + + def getName(self): + return 'Spectral Centroid (using legacy process interface)' + + def getIdentifier(self): + return 'vampy-sc3' + + def getMaxChannelCount(self): + return 1 + + def getInputDomain(self): + return FrequencyDomain + + def getOutputDescriptors(self): + + cod = OutputDescriptor() + cod.identifier='vampy-sc3' + cod.name='Spectral Centroid' + cod.description='Spectral Centroid (Brightness)' + cod.unit='' + cod.hasFixedBinCount=True + cod.binCount=1 + cod.hasKnownExtents=False + cod.isQuantized=True + cod.quantizeStep=1.0 + cod.sampleType=OneSamplePerStep + return cod + + def getParameterDescriptors(self): + thd = ParameterDescriptor() + thd.identifier='threshold' + thd.name='Noise threshold' + thd.description='Return null or delete this function if not needed.' + thd.unit='v' + thd.minValue=0.0 + thd.maxValue=0.5 + thd.defaultValue=0.05 + thd.isQuantized=False + return thd + + def setParameter(self,paramid,newval): + if paramid == 'threshold' : + self.threshold = newval + return + + def getParameter(self,paramid): + if paramid == 'threshold' : + return self.threshold + else: + return 0.0 + + def process(self,inputbuffers,timestamp): + + # this is a 1 channel frequency domain plugin, therefore + # inputbuffers contain (block size / 2) + 1 complex numbers + # corresponding to the FFT output from DC to Nyquist inclusive + + cplxArray = inputbuffers[0][:-1] + + prev = self.previousSample + numLin = 0.0 + denom = 0.0 + centroid = 0.0 + + output = FeatureSet() + + pw = 0 + for i in xrange(1,len(cplxArray)) : + pw = pw + abs(cplxArray[i]) + + if pw > self.threshold : + for i in range(1,(len(cplxArray))) : + + re = cplxArray[i].real + im = cplxArray[i].imag + freq = i * self.m_inputSampleRate / self.m_blockSize + power = sqrt (re*re + im*im) / (self.m_blockSize/2) + denom = denom + power + numLin = numLin + freq * power + + if denom != 0 : + centroid = numLin / denom + + else : + centroid = 0.0 + + output[0] = Feature() + output[0].values = centroid + output[0].label = str(centroid) + + return output
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Example VamPy plugins/test/PyMFCC_legacy.py Thu Oct 08 08:59:08 2009 +0000 @@ -0,0 +1,401 @@ +'''PyMFCC_legacy.py - This example Vampy plugin demonstrates +how to return sprectrogram-like features. + +This plugin has frequency domain input and is using +the legacy input interface: the frequency samples are +passed as python list of complex numbers. + +Note: This is not the adviced way of writing plugins, +since the input interfaces provided for Numpy such as the +Numpy Array interface (flag: vf_ARRAY) are much faster. + +This plugin is using Numpy, but it does not rely on Vampy's +capability of passing Numpy arrays to the process directly. +However, it returns Numpy arrays from the process. + +Centre for Digital Music, Queen Mary University of London. +Copyright 2006 Gyorgy Fazekas, QMUL. +(See Vamp API for licence information.) + +Constants for Mel frequency conversion and filter +centre calculation are taken from the GNU GPL licenced +Freespeech library. Copyright (C) 1999 Jean-Marc Valin +''' + +import sys,numpy +from numpy import log,exp,floor,sum +from numpy import * +from numpy.fft import * +import vampy +from vampy import * + + +class melScaling(object): + + def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None): + '''Initialise frequency warping and DCT matrix. + Parameters: + sampleRate: audio sample rate + inputSize: length of magnitude spectrum (half of FFT size assumed) + numBands: number of mel Bands (MFCCs) + minHz: lower bound of warping (default = DC) + maxHz: higher bound of warping (default = Nyquist frequency) + ''' + self.sampleRate = sampleRate + self.NqHz = sampleRate / 2.0 + self.minHz = minHz + if maxHz is None : maxHz = self.NqHz + self.maxHz = maxHz + self.inputSize = inputSize + self.numBands = numBands + self.valid = False + self.updated = False + + + def update(self): + # make sure this will run only once if called from a vamp process + + if self.updated: return self.valid + self.updated = True + self.valid = False + print 'Updating parameters and recalculating filters: ' + print 'Nyquist: ',self.NqHz + + if self.maxHz > self.NqHz : + raise Exception('Maximum frequency must be smaller than the Nyquist frequency') + + self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) + self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) + print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel) + self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) + self.DCTMatrix = self.getDCTMatrix(self.numBands) + self.filterIter = self.filterMatrix.__iter__() + self.valid = True + return self.valid + + # try : + # self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0) + # self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0) + # self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands) + # self.DCTMatrix = self.getDCTMatrix(self.numBands) + # self.filterIter = self.filterMatrix.__iter__() + # self.valid = True + # return True + # except : + # print "Invalid parameter setting encountered in MelScaling class." + # return False + # return True + + def getFilterCentres(self,inputSize,numBands): + '''Calculate Mel filter centres around FFT bins. + This function calculates two extra bands at the edges for + finding the starting and end point of the first and last + actual filters.''' + centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel + centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz) + return numpy.array(centresBin,int) + + def getFilterMatrix(self,inputSize,numBands): + '''Compose the Mel scaling matrix.''' + filterMatrix = numpy.zeros((numBands,inputSize)) + self.filterCentres = self.getFilterCentres(inputSize,numBands) + for i in xrange(numBands) : + start,centre,end = self.filterCentres[i:i+3] + self.setFilter(filterMatrix[i],start,centre,end) + return filterMatrix.transpose() + + def setFilter(self,filt,filterStart,filterCentre,filterEnd): + '''Calculate a single Mel filter.''' + k1 = numpy.float32(filterCentre-filterStart) + k2 = numpy.float32(filterEnd-filterCentre) + up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1 + dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2 + filt[filterStart:filterCentre] = up + filt[filterCentre:filterEnd] = dn + + def warpSpectrum(self,magnitudeSpectrum): + '''Compute the Mel scaled spectrum.''' + return numpy.dot(magnitudeSpectrum,self.filterMatrix) + + def getDCTMatrix(self,size): + '''Calculate the square DCT transform matrix. Results are + equivalent to Matlab dctmtx(n) but with 64 bit precision.''' + DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size) + DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size + DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT) + DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0) + return DCTmxT + + def dct(self,data_matrix): + '''Compute DCT of input matrix.''' + return numpy.dot(self.DCTMatrix,data_matrix) + + def getMFCCs(self,warpedSpectrum,cn=True): + '''Compute MFCC coefficients from Mel warped magnitude spectrum.''' + mfccs=self.dct(numpy.log(warpedSpectrum)) + if cn is False : mfccs[0] = 0.0 + return mfccs + + +class PyMFCC_legacy(melScaling): + + def __init__(self,inputSampleRate): + + # flags for setting some Vampy options + self.vampy_flags = vf_DEBUG | vf_REALTIME + + self.m_inputSampleRate = int(inputSampleRate) + self.m_stepSize = 512 + self.m_blockSize = 2048 + self.m_channels = 1 + self.numBands = 40 + self.cnull = 1 + self.two_ch = False + melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) + + def initialise(self,channels,stepSize,blockSize): + self.m_channels = channels + self.m_stepSize = stepSize + self.m_blockSize = blockSize + self.window = numpy.hamming(blockSize) + melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands) + return True + + def getMaker(self): + return 'Vampy Test Plugins' + + def getCopyright(self): + return 'Plugin By George Fazekas' + + def getName(self): + return 'Vampy Legacy FrequencyDomain MFCC Plugin' + + def getIdentifier(self): + return 'vampy-mfcc-test-legacy' + + def getDescription(self): + return 'Vampy FrequencyDomain MFCC Plugin using the Legacy interface.' + + def getMaxChannelCount(self): + return 2 + + def getInputDomain(self): + return FrequencyDomain + + def getPreferredBlockSize(self): + return 2048 + + def getPreferredStepSize(self): + return 512 + + def getOutputDescriptors(self): + + Generic = OutputDescriptor() + Generic.hasFixedBinCount=True + Generic.binCount=int(self.numBands)-self.cnull + Generic.hasKnownExtents=False + Generic.isQuantized=True + Generic.sampleType = OneSamplePerStep + + # note the inheritance of attributes (use is optional) + MFCC = OutputDescriptor(Generic) + MFCC.identifier = 'mfccs' + MFCC.name = 'MFCCs' + MFCC.description = 'MFCC Coefficients' + MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands))) + MFCC.unit = None + if self.two_ch and self.m_channels == 2 : + MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull) + else : + MFCC.binCount = self.numBands-self.cnull + + warpedSpectrum = OutputDescriptor(Generic) + warpedSpectrum.identifier='warped-fft' + warpedSpectrum.name='Mel Scaled Spectrum' + warpedSpectrum.description='Mel Scaled Magnitide Spectrum' + warpedSpectrum.unit='Mel' + if self.two_ch and self.m_channels == 2 : + warpedSpectrum.binCount = self.m_channels * int(self.numBands) + else : + warpedSpectrum.binCount = self.numBands + + melFilter = OutputDescriptor(Generic) + melFilter.identifier = 'mel-filter-matrix' + melFilter.sampleType='FixedSampleRate' + melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize + melFilter.name='Mel Filter Matrix' + melFilter.description='Returns the created filter matrix in getRemainingFeatures.' + melFilter.unit = None + + return OutputList(MFCC,warpedSpectrum,melFilter) + + + def getParameterDescriptors(self): + + melbands = ParameterDescriptor() + melbands.identifier='melbands' + melbands.name='Number of bands (coefficients)' + melbands.description='Set the number of coefficients.' + melbands.unit = '' + melbands.minValue = 2 + melbands.maxValue = 128 + melbands.defaultValue = 40 + melbands.isQuantized = True + melbands.quantizeStep = 1 + + cnull = ParameterDescriptor() + cnull.identifier='cnull' + cnull.name='Return C0' + cnull.description='Select if the DC coefficient is required.' + cnull.unit = None + cnull.minValue = 0 + cnull.maxValue = 1 + cnull.defaultValue = 0 + cnull.isQuantized = True + cnull.quantizeStep = 1 + + two_ch = ParameterDescriptor(cnull) + two_ch.identifier='two_ch' + two_ch.name='Process channels separately' + two_ch.description='Process two channel files separately.' + two_ch.defaultValue = False + + minHz = ParameterDescriptor() + minHz.identifier='minHz' + minHz.name='minimum frequency' + minHz.description='Set the lower frequency bound.' + minHz.unit='Hz' + minHz.minValue = 0 + minHz.maxValue = 24000 + minHz.defaultValue = 0 + minHz.isQuantized = True + minHz.quantizeStep = 1.0 + + maxHz = ParameterDescriptor() + maxHz.identifier='maxHz' + maxHz.description='Set the upper frequency bound.' + maxHz.name='maximum frequency' + maxHz.unit='Hz' + maxHz.minValue = 100 + maxHz.maxValue = 24000 + maxHz.defaultValue = 11025 + maxHz.isQuantized = True + maxHz.quantizeStep = 100 + + return ParameterList(melbands,minHz,maxHz,cnull,two_ch) + + + def setParameter(self,paramid,newval): + self.valid = False + if paramid == 'minHz' : + if newval < self.maxHz and newval < self.NqHz : + self.minHz = float(newval) + print 'minHz: ', self.minHz + if paramid == 'maxHz' : + print 'trying to set maxHz to: ',newval + if newval < self.NqHz and newval > self.minHz+1000 : + self.maxHz = float(newval) + else : + self.maxHz = self.NqHz + print 'set to: ',self.maxHz + if paramid == 'cnull' : + self.cnull = int(not int(newval)) + if paramid == 'melbands' : + self.numBands = int(newval) + if paramid == 'two_ch' : + self.two_ch = bool(newval) + + return + + def getParameter(self,paramid): + if paramid == 'minHz' : + return float(self.minHz) + if paramid == 'maxHz' : + return float(self.maxHz) + if paramid == 'cnull' : + return float(not int(self.cnull)) + if paramid == 'melbands' : + return float(self.numBands) + if paramid == 'two_ch' : + return float(self.two_ch) + else: + return 0.0 + + # set numpy process using the 'use_numpy_interface' flag + def process(self,inputbuffers,timestamp): + + if not self.update() : return None + + if self.m_channels == 2 and self.two_ch : + return self.process2ch(inputbuffers,timestamp) + + fftsize = self.m_blockSize + + if self.m_channels > 1 : + # take the mean of the two magnitude spectra + complexSpectrum0 = array(inputbuffers[0]) + complexSpectrum1 = array(inputbuffers[1]) + magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] + magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] + magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2 + else : + complexSpectrum = array(inputbuffers[0]) + magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2] + + # do the computation + melSpectrum = self.warpSpectrum(magnitudeSpectrum) + melCepstrum = self.getMFCCs(melSpectrum,cn=True) + + outputs = FeatureSet() + outputs[0] = Feature(melCepstrum[self.cnull:]) + outputs[1] = Feature(melSpectrum) + return outputs + + + # process channels separately (stack the returned arrays) + def process2ch(self,inputbuffers,timestamp): + + fftsize = self.m_blockSize + + complexSpectrum0 = array(inputbuffers[0]) + complexSpectrum1 = array(inputbuffers[1]) + + magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2] + magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2] + + # do the computations + melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0) + melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True) + melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1) + melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True) + + outputs = FeatureSet() + + outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:]))) + + outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0))) + + return outputs + + + def getRemainingFeatures(self): + if not self.update() : return [] + frameSampleStart = 0 + + output_featureSet = FeatureSet() + + # the filter is the third output (index starts from zero) + output_featureSet[2] = flist = FeatureList() + + while True: + f = Feature() + f.hasTimestamp = True + f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate) + try : + f.values = self.filterIter.next() + except StopIteration : + break + flist.append(f) + frameSampleStart += self.m_stepSize + + return output_featureSet