changeset 52:d56f48aafb99

Updated some example plugins.
author fazekasgy
date Thu, 08 Oct 2009 08:59:08 +0000
parents c1e4f706ca9a
children 7e59caea821b
files Example VamPy plugins/PyMFCC.py Example VamPy plugins/PySpectralCentroid.py Example VamPy plugins/test/PyMFCC_legacy.py
diffstat 3 files changed, 546 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/Example VamPy plugins/PyMFCC.py	Thu Oct 08 08:47:28 2009 +0000
+++ b/Example VamPy plugins/PyMFCC.py	Thu Oct 08 08:59:08 2009 +0000
@@ -329,6 +329,7 @@
 		# do the frequency warping and MFCC computation
 		melSpectrum = self.warpSpectrum(magnitudeSpectrum)
 		melCepstrum = self.getMFCCs(melSpectrum,cn=True)
+		# print melSpectrum,melCepstrum
 		
 		# returning the values: 
 		outputs = FeatureSet()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Example VamPy plugins/PySpectralCentroid.py	Thu Oct 08 08:59:08 2009 +0000
@@ -0,0 +1,144 @@
+'''PySpectralCentroid.py - Example plugin demonstrates 
+how to write a C style plugin using VamPy in pure Python.
+This plugin also introduces the use of the builtin vampy
+extension module.
+
+The plugin has frequency domain input and is using the
+legacy interface: the FFT outpout is passed as a list
+of complex numbers.
+
+Outputs: 
+1) Spectral centroid
+
+Note: This is not the adviced way of writing Vampy plugins now,
+since the interfaces provided for Numpy are at least 5 times
+faster. However, this is still a nice and easy to understand
+example, which also shows how can one write a reasonable
+plugin without having Numpy installed.
+
+Warning: Earlier versions of this plugin are now obsolete.
+(They were using the legacy interface of Vampy 1 which
+did not distinquish between time and frequency domain inputs.)
+
+Centre for Digital Music, Queen Mary University of London.
+Copyright (C) 2009 Gyorgy Fazekas, QMUL. (See Vamp sources 
+for licence information.)
+
+'''
+
+# import the names we use from vampy 
+from vampy import Feature,FeatureSet,ParameterDescriptor
+from vampy import OutputDescriptor,FrequencyDomain,OneSamplePerStep
+
+from math import sqrt
+
+class PySpectralCentroid: 
+	
+	def __init__(self,inputSampleRate): 
+		self.m_imputSampleRate = 0.0 
+		self.m_stepSize = 0
+		self.m_blockSize = 0
+		self.m_channels = 0
+		self.previousSample = 0.0
+		self.m_inputSampleRate = inputSampleRate
+		self.threshold = 0.00
+		
+	def initialise(self,channels,stepSize,blockSize):
+		self.m_channels = channels
+		self.m_stepSize = stepSize		
+		self.m_blockSize = blockSize
+		return True
+	
+	def getMaker(self):
+		return 'Vampy Example Plugins'
+	
+	def getName(self):
+		return 'Spectral Centroid (using legacy process interface)'
+		
+	def getIdentifier(self):
+		return 'vampy-sc3'
+	
+	def getMaxChannelCount(self):
+		return 1
+		
+	def getInputDomain(self):
+		return FrequencyDomain
+			
+	def getOutputDescriptors(self):
+		
+		cod = OutputDescriptor()
+		cod.identifier='vampy-sc3'
+		cod.name='Spectral Centroid'
+		cod.description='Spectral Centroid (Brightness)'
+		cod.unit=''
+		cod.hasFixedBinCount=True
+		cod.binCount=1
+		cod.hasKnownExtents=False
+		cod.isQuantized=True
+		cod.quantizeStep=1.0
+		cod.sampleType=OneSamplePerStep
+		return cod
+
+	def getParameterDescriptors(self):
+		thd = ParameterDescriptor()
+		thd.identifier='threshold'
+		thd.name='Noise threshold'
+		thd.description='Return null or delete this function if not needed.'
+		thd.unit='v'
+		thd.minValue=0.0
+		thd.maxValue=0.5
+		thd.defaultValue=0.05
+		thd.isQuantized=False
+		return thd
+
+	def setParameter(self,paramid,newval):
+		if paramid == 'threshold' :
+			self.threshold = newval
+		return
+		
+	def getParameter(self,paramid):
+		if paramid == 'threshold' :
+			return self.threshold
+		else:
+			return 0.0
+			
+	def process(self,inputbuffers,timestamp):
+		
+		# this is a 1 channel frequency domain plugin, therefore
+		# inputbuffers contain (block size / 2) + 1 complex numbers
+		# corresponding to the FFT output from DC to Nyquist inclusive
+		
+		cplxArray = inputbuffers[0][:-1]
+
+		prev = self.previousSample
+		numLin = 0.0
+		denom = 0.0
+		centroid = 0.0		
+
+		output = FeatureSet()
+
+		pw = 0
+		for i in xrange(1,len(cplxArray)) : 
+			pw = pw + abs(cplxArray[i])
+		
+		if pw > self.threshold : 
+			for i in range(1,(len(cplxArray))) :
+				
+				re = cplxArray[i].real
+				im = cplxArray[i].imag
+				freq = i * self.m_inputSampleRate / self.m_blockSize
+				power = sqrt (re*re + im*im) / (self.m_blockSize/2)
+				denom = denom + power
+				numLin = numLin + freq * power
+				
+			if denom != 0 :
+				centroid = numLin / denom 
+				
+		else :
+			centroid = 0.0
+			
+		output[0] = Feature()
+		output[0].values = centroid
+		output[0].label = str(centroid)
+		
+		return output
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Example VamPy plugins/test/PyMFCC_legacy.py	Thu Oct 08 08:59:08 2009 +0000
@@ -0,0 +1,401 @@
+'''PyMFCC_legacy.py - This example Vampy plugin demonstrates
+how to return sprectrogram-like features.
+
+This plugin has frequency domain input and is using
+the legacy input interface: the frequency samples are
+passed as python list of complex numbers.
+
+Note: This is not the adviced way of writing plugins, 
+since the input interfaces provided for Numpy such as the
+Numpy Array interface (flag: vf_ARRAY) are much faster.
+
+This plugin is using Numpy, but it does not rely on Vampy's
+capability of passing Numpy arrays to the process directly.
+However, it returns Numpy arrays from the process.
+
+Centre for Digital Music, Queen Mary University of London.
+Copyright 2006 Gyorgy Fazekas, QMUL. 
+(See Vamp API for licence information.)
+
+Constants for Mel frequency conversion and filter 
+centre calculation are taken from the GNU GPL licenced 
+Freespeech library. Copyright (C) 1999 Jean-Marc Valin
+'''
+
+import sys,numpy
+from numpy import log,exp,floor,sum
+from numpy import *       
+from numpy.fft import *
+import vampy
+from vampy import *
+
+
+class melScaling(object):
+
+	def __init__(self,sampleRate,inputSize,numBands,minHz = 0,maxHz = None):
+		'''Initialise frequency warping and DCT matrix. 
+		Parameters:
+		sampleRate: audio sample rate
+		inputSize: length of magnitude spectrum (half of FFT size assumed)
+		numBands: number of mel Bands (MFCCs)
+		minHz: lower bound of warping  (default = DC)
+		maxHz: higher bound of warping (default = Nyquist frequency)
+		'''
+		self.sampleRate = sampleRate
+		self.NqHz = sampleRate / 2.0
+		self.minHz = minHz
+		if maxHz is None : maxHz = self.NqHz
+		self.maxHz = maxHz
+		self.inputSize = inputSize
+		self.numBands = numBands
+		self.valid = False
+		self.updated = False
+		
+		
+	def update(self): 
+		# make sure this will run only once if called from a vamp process
+		
+		if self.updated: return self.valid
+		self.updated = True
+		self.valid = False
+		print 'Updating parameters and recalculating filters: '
+		print 'Nyquist: ',self.NqHz
+		
+		if self.maxHz > self.NqHz : 
+			raise Exception('Maximum frequency must be smaller than the Nyquist frequency')
+		
+		self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0)
+		self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0)
+		print 'minHz:%s\nmaxHz:%s\nminMel:%s\nmaxMel:%s\n' %(self.minHz,self.maxHz,self.minMel,self.maxMel)
+		self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands)
+		self.DCTMatrix = self.getDCTMatrix(self.numBands)
+		self.filterIter = self.filterMatrix.__iter__()
+		self.valid = True
+		return self.valid
+		
+		# try :
+		# 	self.maxMel = 1000*log(1+self.maxHz/700.0)/log(1+1000.0/700.0)
+		# 	self.minMel = 1000*log(1+self.minHz/700.0)/log(1+1000.0/700.0)
+		# 	self.filterMatrix = self.getFilterMatrix(self.inputSize,self.numBands)
+		# 	self.DCTMatrix = self.getDCTMatrix(self.numBands)
+		# 	self.filterIter = self.filterMatrix.__iter__()
+		# 	self.valid = True
+		# 	return True
+		# except :
+		# 	print "Invalid parameter setting encountered in MelScaling class."
+		# 	return False
+		# return True
+		
+	def getFilterCentres(self,inputSize,numBands):
+		'''Calculate Mel filter centres around FFT bins.
+		This function calculates two extra bands at the edges for
+		finding the starting and end point of the first and last 
+		actual filters.'''
+		centresMel = numpy.array(xrange(numBands+2)) * (self.maxMel-self.minMel)/(numBands+1) + self.minMel
+		centresBin = numpy.floor(0.5 + 700.0*inputSize*(exp(centresMel*log(1+1000.0/700.0)/1000.0)-1)/self.NqHz)
+		return numpy.array(centresBin,int)
+		
+	def getFilterMatrix(self,inputSize,numBands):
+		'''Compose the Mel scaling matrix.'''
+		filterMatrix = numpy.zeros((numBands,inputSize))
+		self.filterCentres = self.getFilterCentres(inputSize,numBands)
+		for i in xrange(numBands) :
+			start,centre,end = self.filterCentres[i:i+3]
+			self.setFilter(filterMatrix[i],start,centre,end)
+		return filterMatrix.transpose()
+
+	def setFilter(self,filt,filterStart,filterCentre,filterEnd):
+		'''Calculate a single Mel filter.'''
+		k1 = numpy.float32(filterCentre-filterStart)
+		k2 = numpy.float32(filterEnd-filterCentre)
+		up = (numpy.array(xrange(filterStart,filterCentre))-filterStart)/k1
+		dn = (filterEnd-numpy.array(xrange(filterCentre,filterEnd)))/k2
+		filt[filterStart:filterCentre] = up
+		filt[filterCentre:filterEnd] = dn
+						
+	def warpSpectrum(self,magnitudeSpectrum):
+		'''Compute the Mel scaled spectrum.'''
+		return numpy.dot(magnitudeSpectrum,self.filterMatrix)
+		
+	def getDCTMatrix(self,size):
+		'''Calculate the square DCT transform matrix. Results are 
+		equivalent to Matlab dctmtx(n) but with 64 bit precision.'''
+		DCTmx = numpy.array(xrange(size),numpy.float64).repeat(size).reshape(size,size)
+		DCTmxT = numpy.pi * (DCTmx.transpose()+0.5) / size
+		DCTmxT = (1.0/sqrt( size / 2.0)) * cos(DCTmx * DCTmxT)
+		DCTmxT[0] = DCTmxT[0] * (sqrt(2.0)/2.0)
+		return DCTmxT
+		
+	def dct(self,data_matrix):
+		'''Compute DCT of input matrix.'''
+		return numpy.dot(self.DCTMatrix,data_matrix)
+		
+	def getMFCCs(self,warpedSpectrum,cn=True):
+		'''Compute MFCC coefficients from Mel warped magnitude spectrum.'''
+		mfccs=self.dct(numpy.log(warpedSpectrum))
+		if cn is False : mfccs[0] = 0.0
+		return mfccs
+	
+
+class PyMFCC_legacy(melScaling): 
+	
+	def __init__(self,inputSampleRate):
+		
+		# flags for setting some Vampy options
+		self.vampy_flags = vf_DEBUG | vf_REALTIME
+
+		self.m_inputSampleRate = int(inputSampleRate)
+		self.m_stepSize = 512
+		self.m_blockSize = 2048
+		self.m_channels = 1
+		self.numBands = 40
+		self.cnull = 1
+		self.two_ch = False
+		melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands)
+		
+	def initialise(self,channels,stepSize,blockSize):
+		self.m_channels = channels
+		self.m_stepSize = stepSize		
+		self.m_blockSize = blockSize
+		self.window = numpy.hamming(blockSize)
+		melScaling.__init__(self,int(self.m_inputSampleRate),self.m_blockSize/2,self.numBands)
+		return True
+	
+	def getMaker(self):
+		return 'Vampy Test Plugins'
+		
+	def getCopyright(self):
+		return 'Plugin By George Fazekas'
+	
+	def getName(self):
+		return 'Vampy Legacy FrequencyDomain MFCC Plugin'
+		
+	def getIdentifier(self):
+		return 'vampy-mfcc-test-legacy'
+
+	def getDescription(self):
+		return 'Vampy FrequencyDomain MFCC Plugin using the Legacy interface.'
+	
+	def getMaxChannelCount(self):
+		return 2
+		
+	def getInputDomain(self):
+		return FrequencyDomain 
+		
+	def getPreferredBlockSize(self):
+		return 2048
+		
+	def getPreferredStepSize(self):
+		return 512
+			
+	def getOutputDescriptors(self):
+		
+		Generic = OutputDescriptor() 
+		Generic.hasFixedBinCount=True
+		Generic.binCount=int(self.numBands)-self.cnull
+		Generic.hasKnownExtents=False
+		Generic.isQuantized=True
+		Generic.sampleType = OneSamplePerStep 
+		
+		# note the inheritance of attributes (use is optional)
+		MFCC = OutputDescriptor(Generic)
+		MFCC.identifier = 'mfccs'
+		MFCC.name = 'MFCCs'
+		MFCC.description = 'MFCC Coefficients'
+		MFCC.binNames=map(lambda x: 'C '+str(x),range(self.cnull,int(self.numBands)))
+		MFCC.unit = None
+		if self.two_ch and self.m_channels == 2 :
+			MFCC.binCount = self.m_channels * (int(self.numBands)-self.cnull)
+		else :
+			MFCC.binCount = self.numBands-self.cnull
+				
+		warpedSpectrum = OutputDescriptor(Generic)
+		warpedSpectrum.identifier='warped-fft'
+		warpedSpectrum.name='Mel Scaled Spectrum'
+		warpedSpectrum.description='Mel Scaled Magnitide Spectrum'
+		warpedSpectrum.unit='Mel'
+		if self.two_ch and self.m_channels == 2 :
+			warpedSpectrum.binCount = self.m_channels * int(self.numBands) 
+		else :
+			warpedSpectrum.binCount = self.numBands
+		
+		melFilter = OutputDescriptor(Generic)
+		melFilter.identifier = 'mel-filter-matrix'
+		melFilter.sampleType='FixedSampleRate'
+		melFilter.sampleRate=self.m_inputSampleRate/self.m_stepSize
+		melFilter.name='Mel Filter Matrix'
+		melFilter.description='Returns the created filter matrix in getRemainingFeatures.'
+		melFilter.unit = None
+				
+		return OutputList(MFCC,warpedSpectrum,melFilter)
+		
+
+	def getParameterDescriptors(self):
+
+		melbands = ParameterDescriptor()
+		melbands.identifier='melbands'
+		melbands.name='Number of bands (coefficients)'
+		melbands.description='Set the number of coefficients.'
+		melbands.unit = ''
+		melbands.minValue = 2
+		melbands.maxValue = 128
+		melbands.defaultValue = 40
+		melbands.isQuantized = True
+		melbands.quantizeStep = 1
+				
+		cnull = ParameterDescriptor()
+		cnull.identifier='cnull'
+		cnull.name='Return C0'
+		cnull.description='Select if the DC coefficient is required.'
+		cnull.unit = None
+		cnull.minValue = 0
+		cnull.maxValue = 1
+		cnull.defaultValue = 0
+		cnull.isQuantized = True
+		cnull.quantizeStep = 1
+		
+		two_ch = ParameterDescriptor(cnull)
+		two_ch.identifier='two_ch'
+		two_ch.name='Process channels separately'
+		two_ch.description='Process two channel files separately.'
+		two_ch.defaultValue = False
+				
+		minHz = ParameterDescriptor()
+		minHz.identifier='minHz'
+		minHz.name='minimum frequency'
+		minHz.description='Set the lower frequency bound.'
+		minHz.unit='Hz'
+		minHz.minValue = 0
+		minHz.maxValue = 24000
+		minHz.defaultValue = 0
+		minHz.isQuantized = True
+		minHz.quantizeStep = 1.0
+		
+		maxHz = ParameterDescriptor()
+		maxHz.identifier='maxHz'
+		maxHz.description='Set the upper frequency bound.'
+		maxHz.name='maximum frequency'
+		maxHz.unit='Hz'
+		maxHz.minValue = 100
+		maxHz.maxValue = 24000
+		maxHz.defaultValue = 11025
+		maxHz.isQuantized = True
+		maxHz.quantizeStep = 100
+		
+		return ParameterList(melbands,minHz,maxHz,cnull,two_ch)
+		
+
+	def setParameter(self,paramid,newval):
+		self.valid = False
+		if paramid == 'minHz' :
+			if newval < self.maxHz and newval < self.NqHz :
+				self.minHz = float(newval)
+			print 'minHz: ', self.minHz
+		if paramid == 'maxHz' :
+			print 'trying to set maxHz to: ',newval
+			if newval < self.NqHz and newval > self.minHz+1000 :
+				self.maxHz = float(newval)
+			else :
+				self.maxHz = self.NqHz
+			print 'set to: ',self.maxHz
+		if paramid == 'cnull' :
+			self.cnull = int(not int(newval))
+		if paramid == 'melbands' :
+			self.numBands = int(newval)
+		if paramid == 'two_ch' :
+			self.two_ch = bool(newval)
+			
+		return 
+				
+	def getParameter(self,paramid):
+		if paramid == 'minHz' :
+			return float(self.minHz)
+		if paramid == 'maxHz' :
+			return float(self.maxHz)
+		if paramid == 'cnull' :
+			return float(not int(self.cnull))
+		if paramid == 'melbands' :
+			return float(self.numBands)
+		if paramid == 'two_ch' :
+			return float(self.two_ch)
+		else:
+			return 0.0
+
+	# set numpy process using the 'use_numpy_interface' flag		
+	def process(self,inputbuffers,timestamp):
+		
+		if not self.update() : return None
+		
+		if self.m_channels == 2 and self.two_ch :
+			return self.process2ch(inputbuffers,timestamp)
+
+		fftsize = self.m_blockSize
+				
+		if self.m_channels > 1 :
+			# take the mean of the two magnitude spectra
+			complexSpectrum0 = array(inputbuffers[0])
+			complexSpectrum1 = array(inputbuffers[1])
+			magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2]
+			magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2]
+			magnitudeSpectrum = (magnitudeSpectrum0 + magnitudeSpectrum1) / 2
+		else :
+			complexSpectrum = array(inputbuffers[0])
+			magnitudeSpectrum = abs(complexSpectrum)[0:fftsize/2]
+		
+		# do the computation
+		melSpectrum = self.warpSpectrum(magnitudeSpectrum)
+		melCepstrum = self.getMFCCs(melSpectrum,cn=True)
+		
+		outputs = FeatureSet()
+		outputs[0] = Feature(melCepstrum[self.cnull:])
+		outputs[1] = Feature(melSpectrum)		
+		return outputs
+
+
+	# process channels separately (stack the returned arrays)
+	def process2ch(self,inputbuffers,timestamp):
+
+		fftsize = self.m_blockSize
+		
+		complexSpectrum0 = array(inputbuffers[0])
+		complexSpectrum1 = array(inputbuffers[1])
+		
+		magnitudeSpectrum0 = abs(complexSpectrum0)[0:fftsize/2]
+		magnitudeSpectrum1 = abs(complexSpectrum1)[0:fftsize/2]
+		
+		# do the computations
+		melSpectrum0 = self.warpSpectrum(magnitudeSpectrum0)
+		melCepstrum0 = self.getMFCCs(melSpectrum0,cn=True)
+		melSpectrum1 = self.warpSpectrum(magnitudeSpectrum1)
+		melCepstrum1 = self.getMFCCs(melSpectrum1,cn=True)
+		
+		outputs = FeatureSet()
+		
+		outputs[0] = Feature(hstack((melCepstrum1[self.cnull:],melCepstrum0[self.cnull:])))
+		
+		outputs[1] = Feature(hstack((melSpectrum1,melSpectrum0)))
+		
+		return outputs
+
+
+	def getRemainingFeatures(self):
+		if not self.update() : return []
+		frameSampleStart = 0
+		
+		output_featureSet = FeatureSet()
+
+		# the filter is the third output (index starts from zero)
+		output_featureSet[2] = flist = FeatureList()
+
+		while True:
+			f = Feature()
+			f.hasTimestamp = True
+			f.timestamp = frame2RealTime(frameSampleStart,self.m_inputSampleRate)
+			try :
+				f.values = self.filterIter.next()
+			except StopIteration :
+				break
+			flist.append(f)
+			frameSampleStart += self.m_stepSize
+
+		return output_featureSet