Mercurial > hg > segmentation
view utils/GmmMetrics.py @ 0:26838b1f560f
initial commit of a segmenter project
author | mi tian |
---|---|
date | Thu, 02 Apr 2015 18:09:27 +0100 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python # encoding: utf-8 """ GmmFeature.py Created by George Fazekas on 2014-05-30. Copyright (c) 2014 . All rights reserved. """ import sys, os, math, wave, struct, cPickle from pprint import pprint from os.path import join, isdir, isfile, abspath, dirname, basename, split, splitext from itertools import * import matplotlib.pyplot as plt from numpy import sum,isnan, log, power, pi, exp, transpose from numpy.random import rand import numpy as np from sklearn.mixture import GMM from sklearn.metrics.pairwise import pairwise_distances from scipy.linalg import * from scipy.io.wavfile import read # from wavebender import * from gmmdist import skl_models outFile = '/Users/mitian/Documents/hg/seg/test/distance.txt' audioDir = '/Users/mitian/Documents/hg/seg/audio' class GaussianComponent(object): '''GMM representations of data.''' __slots__ = ['weight','means','covars'] def __getstate__(self): d = {} for obj in GaussianComponent.__slots__ : d.update({obj:getattr(self,obj)}) return d def __setstate__(self,d): for k,v in d.iteritems() : setattr(self,k,v) class AudioObj(object): '''A class to store generated audio samples.''' __slots__ = ['name', 'data'] class FeatureObj(object): '''A class to store extracted features for audio samples.''' __slots__ = ['audio', 'mean', 'std', 'sc', 'sl'] class DistanceObj(object): '''A class to store calculated GMM distances.''' __slots__ = ['audio1', 'audio2', 'nComponents', 'skl_diag', 'skl_full', 'c2', 'euclidean', 'bhattacharyya'] def save(self, filename=None): '''Save distance objects.''' if filename == None: filenmae = outFile f = open(filenmae,'a+') f.write(cPickle.dumps(self.__dict__)) f.close() class GmmDistance(object): '''Calculating distances between two pdfs using different distance metrics.''' def __init__(self, feature_array, components = 1, save_data=False): self.n_components = components self.gmm = GMM(n_components = components, covariance_type='full') feature_array = np.nan_to_num(feature_array) feature_array[np.isinf(feature_array)] = 0.0 feature_array[np.isnan(feature_array)] = 0.0 self.model = self.gmm.fit(feature_array) if save_data : self.data = feature_array else : self.data = None if not self.model.converged_ : print "Warning: model fitting did not converge." self.means = self.model.means_ self.covars = self.model.covars_ self.weights = self.model.weights_ self.label = -1 # used only for clustering self.components = [] for c in range(self.n_components) : g = GaussianComponent() g.weight = self.weights[c] g.means = self.means[c] g.covars = self.covars[c] self.components.append(g) def update(self): for c in range(self.n_components) : self.weights[c] = self.components[c].weight self.means[c] = self.components[c].means self.covars[c] = self.components[c].covars def __str__(self): self.update() return "GMM object:\nmeans:%(means)s\ncovariances:%(covars)s\nweights%(weights)s" %vars(self) def kl_div(self, p1, p2): '''Compute KL divergence between p1 and p2 with diagonal covariances.''' if p1.means is None or p2.means is None: return np.finfo(np.float64).max # d = ((0.5 * log(p2.covars**2/p1.covars**2)) - 0.5 + p1.covars**2/(2*p2.covars**2) + (abs(p2.means - p1.means)**2) / (2*p2.covars**2)) d = -2.0 * p1.means.shape[0] d += sum( (p1.covars / p2.covars) + (p2.covars / p1.covars) + ((p1.means - p2.means) * ((1.0 / p1.covars) + (1.0 / p2.covars)) * (p1.means - p2.means)), dtype = np.float64 ) if isnan(d) : return np.finfo(np.float64).max return d def skl_distance_diag(self,other): '''Estimate the symmetrised Kullback–Leibler divergence between this and an other model.''' # TODO: the components are sorted so this works but still theoretically not quite correct to do this... assert (self.n_components == other.n_components), "Number of components must be equal." kl12 = kl21 = 0.0 for i in range(self.n_components) : kl12 += self.weights[i] * self.kl_div(self.components[i],other.components[i]) kl21 += other.weights[i] * self.kl_div(other.components[i],self.components[i]) return (kl12 + kl21 ) / 2.0 def skl_distance_full(self,other): return skl_models(self,other) def skl_distance_full_orig(self,other): '''Estimate the symmetrised Kullback–Leibler divergence between this and an other model.''' n = len(self.components) # number of components d = len(self.components[0].means) # number of dimensions ixm = np.ones((n,1),dtype=int).T # selector of mean matrix components ixd = range(0,d*d,d+1) # indices of diagonal elements of DxD matrix t1 = self.covars.swapaxes(1,2).reshape(d,n*d) # concatenate gmm1 covariance matrices t2 = other.covars.swapaxes(1,2).reshape(d,n*d) # concatenate gmm2 covariance matrices loopn = xrange(n) logdet1 = np.zeros((n,1)) kl11 = np.zeros((n,n)) for i in loopn : # step 1) precompute log(determinant()) of covariance matrices of gmm1 logdet1[i] = log(det(self.covars.swapaxes(0,2)[:,:,i])) # step 2) compute reference kldiv between individual components of gmm1 inv1 = inv(self.covars.swapaxes(0,2)[:,:,i]) mm1 = self.means - self.means[i*ixm,:][0] b1 = np.dot(inv1,t1).swapaxes(0,1).reshape(n,power(d,2)).T kl11[:,i] = 0.5 * ( (logdet1[i]-d-logdet1)[:,0] + sum(b1[ixd,:],0).T + sum(np.dot(mm1,inv1) * mm1,1)) # print kl11 logdet2 = np.zeros((n,1)) kl22 = np.zeros((n,n)) for i in loopn : # step 3) precompute log(determinant()) of covariance matrices of gmm2 logdet2[i] = log(det(other.covars.swapaxes(0,2)[:,:,i])) inv2 = inv(other.covars.swapaxes(0,2)[:,:,i]) mm2 = other.means - other.means[i*ixm,:][0] b2 = np.dot(inv2,t2).swapaxes(0,1).reshape(n,power(d,2)).T kl22[:,i] = 0.5 * ( (logdet2[i]-d-logdet2)[:,0] + sum(b2[ixd,:],0).T + sum(np.dot(mm2,inv2) * mm2,1)) # step 4) compute pair-wise kldiv between components of gmm1 and gmm2 kl12 = np.zeros((n,n)) kl21 = np.zeros((n,n)) for i in loopn : inv1 = inv(self.covars.swapaxes(0,2)[:,:,i]) inv2 = inv(other.covars.swapaxes(0,2)[:,:,i]) m12 = self.means - other.means[i*ixm,:][0] m21 = other.means - self.means[i*ixm,:][0] b1 = np.dot(inv1,t1).swapaxes(0,1).reshape(n,power(d,2)).T b2 = np.dot(inv2,t2).swapaxes(0,1).reshape(n,power(d,2)).T kl12[:,i] = 0.5 * ( (logdet2[i]-d-logdet1)[:,0] + sum(b2[ixd,:],0).T + sum(np.dot(m12,inv2) * m12,1)) kl21[:,i] = 0.5 * ( (logdet1[i]-d-logdet2)[:,0] + sum(b1[ixd,:],0).T + sum(np.dot(m21,inv1) * m21,1)) # step 5) compute the final variational distance between gmm1 and gmm2 kl_full_12 = np.dot(self.weights.T, (log(sum(exp(-kl11)*self.weights,1))) - log(sum(exp(-kl12)*other.weights,1))) kl_full_21 = np.dot(other.weights.T, (log(sum(exp(-kl22)*other.weights,1))) - log(sum(exp(-kl21)*self.weights,1))) return (kl_full_12 + kl_full_21 ) / 2.0 def emd(self, p1, p2): '''Compute earth movers distance between component p1 and p2''' pass def c2(self, p1, p2): d = power(det(p1.covars), -0.5) * power(det(p2.covars), -0.5) return d def c2_distance(self, other): '''Compute the c2 pdf distance metric G. Sfikas etc. An analytic distance metric for Gaussian Mixture Models with application in image retrieval. ICANN 2005. ''' d12 = d11 = d22 = 0.0 for i in xrange(self.n_components) : for j in xrange(self.n_components) : V = inv(inv(self.covars[i]) + inv(other.covars[j])) K = self.means[i].T.dot(inv(self.covars[i])) * (self.means[i] - other.means[j]) + other.means[j].T.dot(inv(other.covars[j])) * (other.means[j] - self.means[i]) d12 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(self.components[i], other.components[j]))), 0.5)) d11 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(self.components[i], self.components[j]))), 0.5)) d22 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(other.components[i], other.components[j]))), 0.5)) dist = -log(2 * d12 / (d11 + d22)) if isnan(dist) : return np.finfo(np.float64).max return dist def euclidean(self, p1, p2): '''Compute euclidean distance between p1 and p2''' if p1.means is None or p2.means is None: return np.finfo(np.float64).max d = sum(power(2 * pi * (power(p1.covars, 2) + power(p2.covars, 2)), -0.5) * exp(-0.5 * power(p1.means - p2.means, 2) / (power(p1.covars, 2) + power(p2.covars, 2)))) if isnan(d) : return np.finfo(np.float64).max return d def euclidean_distance(self, other): '''Compute the pdf distance metric''' e11 = e22 = e12 = 0.0 for i in range(self.n_components) : e11 += self.weights[i] * self.weights[i] * self.euclidean(self.components[i],self.components[i]) e22 += other.weights[i] * other.weights[i] * self.euclidean(other.components[i],other.components[i]) e12 += self.weights[i] * other.weights[i] * self.euclidean(self.components[i],other.components[i]) dist = e11 + e22 - 2 * e12 if isnan(dist) : return np.finfo(np.float64).max return dist def bhattacharyya(self, p1, p2): '''Compute the Bhattacharyya based distance following: K. Fukunaga. Introduction to statistical pattern recognition. Academic Press 1990 ''' B_dev = 0.125 * ((p1.means - p2.means).T.dot( inv((p1.covars + p2.covars) / 2))).dot(p1.means - p2.means)\ + 0.5 * log( abs(inv((p1.covars + p2.covars) / 2)) / power(abs(inv(p1.covars) * inv(p2.covars)), 0.5) ) d = sum(B_dev) if isnan(d) : return np.finfo(np.float64).max return d def bhattacharyya_distance(self, other): '''Compute the pdf distance metric''' dist = 0.0 for i in xrange(self.n_components) : dist += self.weights[i] * other.weights[i] *self.bhattacharyya(self.components[i], other.components[i]) return dist class AudioGenerator(object): '''Generate simple audio data (sinusoidal etc. and noise and their combinations).''' def sine(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02): n = int(length * framerate) return np.array([amplitude * math.sin(2.0*math.pi * float(freq) * (float(i)/float(framerate))) for i in xrange(n)]) def noise(self, framerate=44100, amplitude=0.1, length=0.02): n = int(length * framerate) return np.array([float(amplitude) * np.random.uniform(-1, 1) for i in xrange(n)]) def square(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02): for s in self.sine(freq, framerate, amplitude, length): if s > 0: yield amplitude elif s < 0: yield -amplitude else: yield 0.0 def damped(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02): n = int(length*framerate) return (exp(-(float(i%n)/float(framerate))) * s for i, s in enumerate(self.sine(frequency, framerate, amplitude, length))) def write_wav_file(self, samples,filename) : '''Create a wav file and write it to disk.''' nchannels, sampwidth, framerate, nframes = 1, 2, 44100, len(samples) max_amplitude = 32767.0 w = wave.open(filename, 'w') w.setparams((nchannels, sampwidth, framerate, nframes, 'NONE', 'not compressed')) frames = str().join(struct.pack('h', int(max_amplitude * s)) for s in samples) w.writeframesraw(frames) w.close() print "wav file: %s written." %filename class FeatureExtractor(object): '''Extract low level features of input audio samples for compare distance computation.''' def main(): # ag = AudioGenerator() # fe = FeatureExtractor() # # # Create a "test1.wav" file which is 10s length and contains two sinusoids + noise # length = 10.0 # unit = sG # samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.noise(length=length) # samples = samples / (max(samples) + 0.05) # ag.write_wav_file(samples, "audio/test1.wav") # # Create a file indentical to 'test1.wav # ag.write_wav_file(samples,"audio/test1a.wav") # # Create a file with the same componentes as "test1.wav" but the freq of one sinusouid different # samples = ag.sine(440.0,length=length) + ag.sine(480.0,length=length) + ag.noise(length=length) # samples = samples / (max(samples) + 0.05) # ag.write_wav_file(samples, "audio/test1b.wav") # # Create a file with the same componentes as "test1.wav" but the freq of both sinusouids different # samples = ag.sine(880.0,length=length) + ag.sine(480.0,length=length) + ag.noise(length=length) # samples = samples / (max(samples) + 0.05) # ag.write_wav_file(samples, "audio/test1c.wav") # # # Create a file with one more sinusouid componentes as "test1.wav" # samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.sine(880.0,length=length) + ag.noise(length=length) # samples = samples / (max(samples)+0.05) # ag.write_wav_file(samples, "audio/test2a.wav") # # Create a file with changed freq and one more sinusouid componentes as "test1.wav" # samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.sine(1320.0,length=length) + ag.noise(length=length) # samples = samples / (max(samples)+0.05) # ag.write_wav_file(samples, "audio/test2b.wav") # # # Create a file with longer length than "test1.wav" # samples = ag.sine(440.0,length=15) + ag.sine(240.0,length=15) + ag.noise(length=15) # samples = samples / (max(samples)+0.05) # ag.write_wav_file(samples, "audio/test3a.wav") # # Create a file with longer length and one more sinusoid than "test1.wav" # samples = ag.sine(440.0,length=15) + ag.sine(240.0,length=15) + ag.sine(880.0,length=15) + ag.noise(length=15) # samples = samples / (max(samples)+0.05) # ag.write_wav_file(samples, "audio/test3b.wav") # plt.plot(samples[:1000]) # plt.show() # print "Testing Gaussian Feature: Generating random features." feature_array_1 = np.zeros((20,100),dtype=np.float64) feature_array_2 = np.zeros((20,100),dtype=np.float64) r = rand(2) for x in xrange(100) : feature_array_1[:,x] = rand(20) + r[0] feature_array_2[:,x] = rand(20) + r[1] f1 = GmmDistance(feature_array_1) f2 = GmmDistance(feature_array_2) print f1,f2,"\n" print "KL2 distance between f1-f2 using diag covariance:", f1.skl_distance_diag(f2) print "KL2 distance between f1-f1 using diag covariance:", f1.skl_distance_diag(f1) print "KL2 distance between f2-f2 using diag covariance:", f2.skl_distance_diag(f2) print "KL2 distance between f2-f1 using diag covariance:", f2.skl_distance_diag(f1) print '\n' print "KL2 distance between f1-f2 using full covariance:", f1.skl_distance_full(f2) print "KL2 distance between f1-f1 using full covariance:", f1.skl_distance_full(f1) print "KL2 distance between f2-f2 using full covariance:", f2.skl_distance_full(f2) print "KL2 distance between f2-f1 using full covariance:", f2.skl_distance_full(f1) print '\n' print "c2 distance between f1-f2:", f1.c2_distance(f2) print "c2 distance between f1-f1:", f1.c2_distance(f1) print "c2 distance between f2-f2:", f2.c2_distance(f2) print "c2 distance between f2-f1:", f2.c2_distance(f1) print '\n' print "euclidean distance between f1-f2:", f1.euclidean_distance(f2) print "euclidean distance between f1-f1:", f1.euclidean_distance(f1) print "euclidean distance between f2-f2:", f2.euclidean_distance(f2) print "euclidean distance between f2-f1:", f2.euclidean_distance(f1) print '\n' print "bhattacharyya distance between f1-f2:", f1.bhattacharyya_distance(f2) print "bhattacharyya distance between f1-f1:", f1.bhattacharyya_distance(f1) print "bhattacharyya distance between f2-f2:", f2.bhattacharyya_distance(f2) print "bhattacharyya distance between f2-f1:", f2.bhattacharyya_distance(f1) # ag = AudioGenerator() # sound = ag.synthesisor(nChannels = 2, framerate=44100, amplitude=0.5, length=0.02) # ag.write_wav_file(samples=sound, nframes=None, length=0.02, filename='audio/test.wav') # # l = dir(__builtins__) # d = __builtins__.__dict__ # pprint(l) # pprint(d) # # Load audio data # audio_files = [i for i in os.listdir(audioDir) if i.endswith('wav') and not i.startswith('.')] # audio_list = [] # for i in audio_files: # ao = AudioObj() # ao.name = splitext(i)[0] # ao.data = np.array(read(join(audioDir, i))[1], dtype=float) # audio_list.append(ao) # # # Calculate pairwise ditances between audio data using listed metrics # res_list = [] # for a1, a2 in combinations(audio_list, 2): # # basic info # res = DistanceObj() # f1 = GmmDistance(a1.data) # f2 = GmmDistance(a2.data) # res.audio1 = a1.name # res.audio2 = a2.name # res.nComponents = len(f1.components) # # # distances between two samples should be symmetric # res.skl_full = f1.skl_distance_full(f2) # res.skl_diag = f1.skl_distance_diag(f2) # res.c2 = f1.c2_distance(f2) # res.euclidean = f1.euclidean_distance(f2) # res.bhattacharyya = f1.bhattacharyya_distance(f2) # # res.save() # res_list.append(res) if __name__ == '__main__': main()