view utils/GmmMetrics.py @ 0:26838b1f560f

initial commit of a segmenter project
author mi tian
date Thu, 02 Apr 2015 18:09:27 +0100
parents
children
line wrap: on
line source
#!/usr/bin/env python
# encoding: utf-8
"""
GmmFeature.py

Created by George Fazekas on 2014-05-30.
Copyright (c) 2014 . All rights reserved.
"""

import sys, os, math, wave, struct, cPickle
from pprint import pprint
from os.path import join, isdir, isfile, abspath, dirname, basename, split, splitext
from itertools import *
import matplotlib.pyplot as plt
from numpy import sum,isnan, log, power, pi, exp, transpose
from numpy.random import rand
import numpy as np
from sklearn.mixture import GMM
from sklearn.metrics.pairwise import pairwise_distances
from scipy.linalg import *
from scipy.io.wavfile import read
# from wavebender import *
from gmmdist import skl_models

outFile = '/Users/mitian/Documents/hg/seg/test/distance.txt'
audioDir = '/Users/mitian/Documents/hg/seg/audio'


class GaussianComponent(object):
	'''GMM representations of data.'''
	__slots__ = ['weight','means','covars']
	
	def __getstate__(self):
		d = {}
		for obj in GaussianComponent.__slots__ :
			d.update({obj:getattr(self,obj)})
		return d
		
	def __setstate__(self,d):
		for k,v in d.iteritems() :
			setattr(self,k,v)

class AudioObj(object):
	'''A class to store generated audio samples.'''
	__slots__ = ['name', 'data']

class FeatureObj(object):
	'''A class to store extracted features for audio samples.'''
	__slots__ = ['audio', 'mean', 'std', 'sc', 'sl']
	
class DistanceObj(object):
	'''A class to store calculated GMM distances.'''
	__slots__ = ['audio1', 'audio2', 'nComponents', 'skl_diag', 'skl_full', 'c2', 'euclidean', 'bhattacharyya']

	def save(self, filename=None):
		'''Save distance objects.'''
		if filename == None:
			filenmae = outFile
		f = open(filenmae,'a+')
		f.write(cPickle.dumps(self.__dict__))
		f.close()
	
class GmmDistance(object):
	'''Calculating distances between two pdfs using different distance metrics.'''
	
	def __init__(self, feature_array, components = 1, save_data=False):
		self.n_components = components
		self.gmm = GMM(n_components = components, covariance_type='full')
		feature_array = np.nan_to_num(feature_array)
		feature_array[np.isinf(feature_array)] = 0.0
		feature_array[np.isnan(feature_array)] = 0.0
		self.model = self.gmm.fit(feature_array)
		if save_data :
			self.data = feature_array
		else :
			self.data = None
		if not self.model.converged_ :
			print "Warning: model fitting did not converge."
		self.means = self.model.means_
		self.covars = self.model.covars_
		self.weights = self.model.weights_	
		self.label = -1 # used only for clustering
		self.components = []	
		for c in range(self.n_components) :
			g = GaussianComponent()
			g.weight = self.weights[c]
			g.means = self.means[c]
			g.covars = self.covars[c]
			self.components.append(g)
			
	def update(self):
		for c in range(self.n_components) :
			self.weights[c] = self.components[c].weight
			self.means[c] = self.components[c].means
			self.covars[c] = self.components[c].covars		
					
	def __str__(self):
		self.update()
		return "GMM object:\nmeans:%(means)s\ncovariances:%(covars)s\nweights%(weights)s" %vars(self)
		
	def kl_div(self, p1, p2):
		'''Compute KL divergence between p1 and p2 with diagonal covariances.'''
		if p1.means is None or p2.means is None:
			return np.finfo(np.float64).max
		# d = ((0.5 * log(p2.covars**2/p1.covars**2)) - 0.5 + p1.covars**2/(2*p2.covars**2) + (abs(p2.means - p1.means)**2) / (2*p2.covars**2))
		d = -2.0 * p1.means.shape[0]
		d += sum( (p1.covars / p2.covars) + (p2.covars / p1.covars) + ((p1.means - p2.means) * ((1.0 / p1.covars) + (1.0 / p2.covars)) * (p1.means - p2.means)), dtype = np.float64 )
		if isnan(d) : 
			return np.finfo(np.float64).max
		return d
		
	def skl_distance_diag(self,other):
		'''Estimate the symmetrised Kullback–Leibler divergence between this and an other model.'''
		# TODO: the components are sorted so this works but still theoretically not quite correct to do this...
		assert (self.n_components == other.n_components), "Number of components must be equal."
		kl12 = kl21 = 0.0
		for i in range(self.n_components) :
			kl12 += self.weights[i] * self.kl_div(self.components[i],other.components[i])
			kl21 += other.weights[i] * self.kl_div(other.components[i],self.components[i])
		return (kl12 + kl21 ) / 2.0
		
	def skl_distance_full(self,other):
		return skl_models(self,other)
	
	def skl_distance_full_orig(self,other):
		'''Estimate the symmetrised Kullback–Leibler divergence between this and an other model.'''
		n = len(self.components) # number of components
		d = len(self.components[0].means) # number of dimensions
		
		ixm = np.ones((n,1),dtype=int).T # selector of mean matrix components
		ixd = range(0,d*d,d+1) # indices of diagonal elements of DxD matrix
		t1 = self.covars.swapaxes(1,2).reshape(d,n*d) # concatenate gmm1 covariance matrices
		t2 = other.covars.swapaxes(1,2).reshape(d,n*d) # concatenate gmm2 covariance matrices
		loopn = xrange(n)

		logdet1 = np.zeros((n,1))
		kl11 = np.zeros((n,n))
		for i in loopn :
			# step 1) precompute log(determinant()) of covariance matrices of gmm1
			logdet1[i] = log(det(self.covars.swapaxes(0,2)[:,:,i]))
		
			# step 2) compute reference kldiv between individual components of gmm1
			inv1 = inv(self.covars.swapaxes(0,2)[:,:,i])
			mm1 = self.means - self.means[i*ixm,:][0]
			b1 = np.dot(inv1,t1).swapaxes(0,1).reshape(n,power(d,2)).T
			kl11[:,i] = 0.5 * ( (logdet1[i]-d-logdet1)[:,0] + sum(b1[ixd,:],0).T + sum(np.dot(mm1,inv1) * mm1,1))
		# print kl11
		
		logdet2 = np.zeros((n,1))
		kl22 = np.zeros((n,n))
		for i in loopn :
			# step 3) precompute log(determinant()) of covariance matrices of gmm2
			logdet2[i] = log(det(other.covars.swapaxes(0,2)[:,:,i]))
			inv2 = inv(other.covars.swapaxes(0,2)[:,:,i])
			mm2 = other.means - other.means[i*ixm,:][0]
			b2 = np.dot(inv2,t2).swapaxes(0,1).reshape(n,power(d,2)).T
			kl22[:,i] = 0.5 * ( (logdet2[i]-d-logdet2)[:,0] + sum(b2[ixd,:],0).T + sum(np.dot(mm2,inv2) * mm2,1))
		
		# step 4) compute pair-wise kldiv between components of gmm1 and gmm2
		kl12 = np.zeros((n,n))
		kl21 = np.zeros((n,n))
		for i in loopn :
			inv1 = inv(self.covars.swapaxes(0,2)[:,:,i])
			inv2 = inv(other.covars.swapaxes(0,2)[:,:,i])
			m12 = self.means - other.means[i*ixm,:][0]
			m21 = other.means - self.means[i*ixm,:][0]
			b1 = np.dot(inv1,t1).swapaxes(0,1).reshape(n,power(d,2)).T
			b2 = np.dot(inv2,t2).swapaxes(0,1).reshape(n,power(d,2)).T
			kl12[:,i] = 0.5 * ( (logdet2[i]-d-logdet1)[:,0] + sum(b2[ixd,:],0).T + sum(np.dot(m12,inv2) * m12,1))
			kl21[:,i] = 0.5 * ( (logdet1[i]-d-logdet2)[:,0] + sum(b1[ixd,:],0).T + sum(np.dot(m21,inv1) * m21,1))
		
		# step 5) compute the final variational distance between gmm1 and gmm2
		kl_full_12 = np.dot(self.weights.T, (log(sum(exp(-kl11)*self.weights,1))) - log(sum(exp(-kl12)*other.weights,1)))
		kl_full_21 = np.dot(other.weights.T, (log(sum(exp(-kl22)*other.weights,1))) - log(sum(exp(-kl21)*self.weights,1)))
		return (kl_full_12 + kl_full_21 ) / 2.0
			
	def emd(self, p1, p2):
		'''Compute earth movers distance between component p1 and p2'''
		
		pass
	
	def c2(self, p1, p2):
		d = power(det(p1.covars), -0.5) * power(det(p2.covars), -0.5)		
		return d
		
	def c2_distance(self, other):
		'''Compute the c2 pdf distance metric
		G. Sfikas etc. An analytic distance metric for Gaussian Mixture Models with application in image retrieval. ICANN 2005.
		'''
		d12 = d11 = d22 = 0.0
		for i in xrange(self.n_components) :
			for j in xrange(self.n_components) :
				V = inv(inv(self.covars[i]) + inv(other.covars[j]))
				K = self.means[i].T.dot(inv(self.covars[i])) * (self.means[i] - other.means[j]) + other.means[j].T.dot(inv(other.covars[j])) * (other.means[j] - self.means[i])
			
				d12 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(self.components[i], other.components[j]))), 0.5))
				d11 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(self.components[i], self.components[j]))), 0.5))
				d22 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(other.components[i], other.components[j]))), 0.5))
			
		dist = -log(2 * d12 / (d11 + d22))
		
		if isnan(dist) : 
			return np.finfo(np.float64).max		
		return dist
			
	def euclidean(self, p1, p2):
		'''Compute euclidean distance between p1 and p2'''
		if p1.means is None or p2.means is None:
			return np.finfo(np.float64).max
		d = sum(power(2 * pi * (power(p1.covars, 2) + power(p2.covars, 2)), -0.5) * exp(-0.5 * power(p1.means - p2.means, 2) / (power(p1.covars, 2) + power(p2.covars, 2))))
			
		if isnan(d) : 
			return np.finfo(np.float64).max
		return d
	
	def euclidean_distance(self, other):
		'''Compute the pdf distance metric'''
		e11 = e22 = e12 = 0.0
		for i in range(self.n_components) :
			e11 += self.weights[i] * self.weights[i] * self.euclidean(self.components[i],self.components[i])
			e22 += other.weights[i] * other.weights[i] * self.euclidean(other.components[i],other.components[i])
			e12 += self.weights[i] * other.weights[i] * self.euclidean(self.components[i],other.components[i])
			
		dist = e11 + e22 - 2 * e12

		if isnan(dist) : 
			return np.finfo(np.float64).max		
		return dist
		
	def bhattacharyya(self, p1, p2):
		'''Compute the Bhattacharyya based distance following:
		K. Fukunaga. Introduction to statistical pattern recognition. Academic Press 1990
		'''
		B_dev = 0.125 * ((p1.means - p2.means).T.dot( inv((p1.covars + p2.covars) / 2))).dot(p1.means - p2.means)\
		 + 0.5 * log( abs(inv((p1.covars + p2.covars) / 2)) / power(abs(inv(p1.covars) * inv(p2.covars)), 0.5) )
		
		d = sum(B_dev)
		if isnan(d) : 
			return np.finfo(np.float64).max
		return d
	
	def bhattacharyya_distance(self, other):
		'''Compute the pdf distance metric'''
		dist = 0.0
		for i in xrange(self.n_components) :
			dist += self.weights[i] * other.weights[i] *self.bhattacharyya(self.components[i], other.components[i])
		return dist


class AudioGenerator(object):
	'''Generate simple audio data (sinusoidal etc. and noise and their combinations).'''	

	def sine(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02):
	    n = int(length * framerate)
	    return np.array([amplitude * math.sin(2.0*math.pi * float(freq) * (float(i)/float(framerate))) for i in xrange(n)])

	def noise(self, framerate=44100, amplitude=0.1, length=0.02):
	    n = int(length * framerate)
	    return np.array([float(amplitude) * np.random.uniform(-1, 1) for i in xrange(n)])
	
	def square(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02):
		for s in self.sine(freq, framerate, amplitude, length):
			if s > 0:
				yield amplitude
			elif s < 0:
				yield -amplitude
			else:
				yield 0.0
	
	def damped(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02):
		n = int(length*framerate)
		return (exp(-(float(i%n)/float(framerate))) * s for i, s in enumerate(self.sine(frequency, framerate, amplitude, length))) 
	
	def write_wav_file(self, samples,filename) :
	    '''Create a wav file and write it to disk.'''
	    nchannels, sampwidth, framerate, nframes = 1, 2, 44100, len(samples)
	    max_amplitude = 32767.0
	    w = wave.open(filename, 'w')
	    w.setparams((nchannels, sampwidth, framerate, nframes, 'NONE', 'not compressed'))
	    frames = str().join(struct.pack('h', int(max_amplitude * s)) for s in samples)
	    w.writeframesraw(frames)
	    w.close()
	    print "wav file: %s written." %filename

class FeatureExtractor(object):
	'''Extract low level features of input audio samples for compare distance computation.'''	
def main():
	
	# ag = AudioGenerator()
	# fe = FeatureExtractor()
	# 
	# # Create a "test1.wav" file which is 10s length and contains two sinusoids + noise
	# length = 10.0 # unit = sG
	# samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.noise(length=length)
	# samples = samples / (max(samples) + 0.05)
	# ag.write_wav_file(samples, "audio/test1.wav")
	# # Create a file indentical to 'test1.wav
	# ag.write_wav_file(samples,"audio/test1a.wav")
	# # Create a file with the same componentes as "test1.wav" but the freq of one sinusouid different
	# samples = ag.sine(440.0,length=length) + ag.sine(480.0,length=length) + ag.noise(length=length)
	# samples = samples / (max(samples) + 0.05)
	# ag.write_wav_file(samples, "audio/test1b.wav")
	# # Create a file with the same componentes as "test1.wav" but the freq of both sinusouids different
	# samples = ag.sine(880.0,length=length) + ag.sine(480.0,length=length) + ag.noise(length=length)
	# samples = samples / (max(samples) + 0.05)
	# ag.write_wav_file(samples, "audio/test1c.wav")
	# 
	# # Create a file with one more sinusouid componentes as "test1.wav" 
	# samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.sine(880.0,length=length) + ag.noise(length=length)
	# samples = samples / (max(samples)+0.05)
	# ag.write_wav_file(samples, "audio/test2a.wav")
	# # Create a file with changed freq and one more sinusouid componentes as "test1.wav" 
	# samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.sine(1320.0,length=length) + ag.noise(length=length)
	# samples = samples / (max(samples)+0.05)
	# ag.write_wav_file(samples, "audio/test2b.wav")
	# 
	# # Create a file with longer length than "test1.wav" 
	# samples = ag.sine(440.0,length=15) + ag.sine(240.0,length=15) + ag.noise(length=15)
	# samples = samples / (max(samples)+0.05)
	# ag.write_wav_file(samples, "audio/test3a.wav")
	# # Create a file with longer length and one more sinusoid than "test1.wav" 
	# samples = ag.sine(440.0,length=15) + ag.sine(240.0,length=15) + ag.sine(880.0,length=15) + ag.noise(length=15)
	# samples = samples / (max(samples)+0.05)
	# ag.write_wav_file(samples, "audio/test3b.wav")
	
	
	# plt.plot(samples[:1000])
	# plt.show()
	
	
	# print "Testing Gaussian Feature: Generating random features."
	feature_array_1 = np.zeros((20,100),dtype=np.float64)
	feature_array_2 = np.zeros((20,100),dtype=np.float64)
	
	r = rand(2)
	
	for x in xrange(100) :
		feature_array_1[:,x] = rand(20) + r[0]
		feature_array_2[:,x] = rand(20) + r[1]
	
	f1 = GmmDistance(feature_array_1)
	f2 = GmmDistance(feature_array_2)
	
	print f1,f2,"\n"
	
	print "KL2 distance between f1-f2 using diag covariance:", f1.skl_distance_diag(f2)
	print "KL2 distance between f1-f1 using diag covariance:", f1.skl_distance_diag(f1)
	print "KL2 distance between f2-f2 using diag covariance:", f2.skl_distance_diag(f2)
	print "KL2 distance between f2-f1 using diag covariance:", f2.skl_distance_diag(f1)
	print '\n'
	print "KL2 distance between f1-f2 using full covariance:", f1.skl_distance_full(f2)
	print "KL2 distance between f1-f1 using full covariance:", f1.skl_distance_full(f1)
	print "KL2 distance between f2-f2 using full covariance:", f2.skl_distance_full(f2)
	print "KL2 distance between f2-f1 using full covariance:", f2.skl_distance_full(f1)
	print '\n'
	print "c2 distance between f1-f2:", f1.c2_distance(f2)
	print "c2 distance between f1-f1:", f1.c2_distance(f1)
	print "c2 distance between f2-f2:", f2.c2_distance(f2)
	print "c2 distance between f2-f1:", f2.c2_distance(f1)
	print '\n'
	print "euclidean distance between f1-f2:", f1.euclidean_distance(f2)
	print "euclidean distance between f1-f1:", f1.euclidean_distance(f1)
	print "euclidean distance between f2-f2:", f2.euclidean_distance(f2)
	print "euclidean distance between f2-f1:", f2.euclidean_distance(f1)
	print '\n'
	print "bhattacharyya distance between f1-f2:", f1.bhattacharyya_distance(f2)
	print "bhattacharyya distance between f1-f1:", f1.bhattacharyya_distance(f1)
	print "bhattacharyya distance between f2-f2:", f2.bhattacharyya_distance(f2)
	print "bhattacharyya distance between f2-f1:", f2.bhattacharyya_distance(f1)
	
	# ag = AudioGenerator()
	# sound = ag.synthesisor(nChannels = 2, framerate=44100, amplitude=0.5, length=0.02)
	# ag.write_wav_file(samples=sound, nframes=None, length=0.02, filename='audio/test.wav')
	# 
	# l = dir(__builtins__)
	# d = __builtins__.__dict__
	# pprint(l)
	# pprint(d)
	
	# # Load audio data
	# audio_files = [i for i in os.listdir(audioDir) if i.endswith('wav') and not i.startswith('.')]
	# audio_list = []
	# for i in audio_files:
	#	ao = AudioObj()
	#	ao.name = splitext(i)[0]
	#	ao.data = np.array(read(join(audioDir, i))[1],	dtype=float)
	#	audio_list.append(ao)
	#	
	# # Calculate pairwise ditances between audio data using listed metrics
	# res_list = []
	# for a1, a2 in combinations(audio_list, 2):
	#	# basic info
	#	res = DistanceObj()
	#	f1 = GmmDistance(a1.data)
	#	f2 = GmmDistance(a2.data)
	#	res.audio1 = a1.name
	#	res.audio2 = a2.name
	#	res.nComponents = len(f1.components)
	#	
	#	# distances between two samples should be symmetric
	#	res.skl_full = f1.skl_distance_full(f2)
	#	res.skl_diag = f1.skl_distance_diag(f2)
	#	res.c2 = f1.c2_distance(f2)
	#	res.euclidean = f1.euclidean_distance(f2)
	#	res.bhattacharyya = f1.bhattacharyya_distance(f2)
	#	
	#	res.save()
	#	res_list.append(res)
		
		
	
if __name__ == '__main__':
	main()