mi@0: #!/usr/bin/env python mi@0: # encoding: utf-8 mi@0: """ mi@0: GmmFeature.py mi@0: mi@0: Created by George Fazekas on 2014-05-30. mi@0: Copyright (c) 2014 . All rights reserved. mi@0: """ mi@0: mi@0: import sys, os, math, wave, struct, cPickle mi@0: from pprint import pprint mi@0: from os.path import join, isdir, isfile, abspath, dirname, basename, split, splitext mi@0: from itertools import * mi@0: import matplotlib.pyplot as plt mi@0: from numpy import sum,isnan, log, power, pi, exp, transpose mi@0: from numpy.random import rand mi@0: import numpy as np mi@0: from sklearn.mixture import GMM mi@0: from sklearn.metrics.pairwise import pairwise_distances mi@0: from scipy.linalg import * mi@0: from scipy.io.wavfile import read mi@0: # from wavebender import * mi@0: from gmmdist import skl_models mi@0: mi@0: outFile = '/Users/mitian/Documents/hg/seg/test/distance.txt' mi@0: audioDir = '/Users/mitian/Documents/hg/seg/audio' mi@0: mi@0: mi@0: class GaussianComponent(object): mi@0: '''GMM representations of data.''' mi@0: __slots__ = ['weight','means','covars'] mi@0: mi@0: def __getstate__(self): mi@0: d = {} mi@0: for obj in GaussianComponent.__slots__ : mi@0: d.update({obj:getattr(self,obj)}) mi@0: return d mi@0: mi@0: def __setstate__(self,d): mi@0: for k,v in d.iteritems() : mi@0: setattr(self,k,v) mi@0: mi@0: class AudioObj(object): mi@0: '''A class to store generated audio samples.''' mi@0: __slots__ = ['name', 'data'] mi@0: mi@0: class FeatureObj(object): mi@0: '''A class to store extracted features for audio samples.''' mi@0: __slots__ = ['audio', 'mean', 'std', 'sc', 'sl'] mi@0: mi@0: class DistanceObj(object): mi@0: '''A class to store calculated GMM distances.''' mi@0: __slots__ = ['audio1', 'audio2', 'nComponents', 'skl_diag', 'skl_full', 'c2', 'euclidean', 'bhattacharyya'] mi@0: mi@0: def save(self, filename=None): mi@0: '''Save distance objects.''' mi@0: if filename == None: mi@0: filenmae = outFile mi@0: f = open(filenmae,'a+') mi@0: f.write(cPickle.dumps(self.__dict__)) mi@0: f.close() mi@0: mi@0: class GmmDistance(object): mi@0: '''Calculating distances between two pdfs using different distance metrics.''' mi@0: mi@0: def __init__(self, feature_array, components = 1, save_data=False): mi@0: self.n_components = components mi@0: self.gmm = GMM(n_components = components, covariance_type='full') mi@0: feature_array = np.nan_to_num(feature_array) mi@0: feature_array[np.isinf(feature_array)] = 0.0 mi@0: feature_array[np.isnan(feature_array)] = 0.0 mi@0: self.model = self.gmm.fit(feature_array) mi@0: if save_data : mi@0: self.data = feature_array mi@0: else : mi@0: self.data = None mi@0: if not self.model.converged_ : mi@0: print "Warning: model fitting did not converge." mi@0: self.means = self.model.means_ mi@0: self.covars = self.model.covars_ mi@0: self.weights = self.model.weights_ mi@0: self.label = -1 # used only for clustering mi@0: self.components = [] mi@0: for c in range(self.n_components) : mi@0: g = GaussianComponent() mi@0: g.weight = self.weights[c] mi@0: g.means = self.means[c] mi@0: g.covars = self.covars[c] mi@0: self.components.append(g) mi@0: mi@0: def update(self): mi@0: for c in range(self.n_components) : mi@0: self.weights[c] = self.components[c].weight mi@0: self.means[c] = self.components[c].means mi@0: self.covars[c] = self.components[c].covars mi@0: mi@0: def __str__(self): mi@0: self.update() mi@0: return "GMM object:\nmeans:%(means)s\ncovariances:%(covars)s\nweights%(weights)s" %vars(self) mi@0: mi@0: def kl_div(self, p1, p2): mi@0: '''Compute KL divergence between p1 and p2 with diagonal covariances.''' mi@0: if p1.means is None or p2.means is None: mi@0: return np.finfo(np.float64).max mi@0: # d = ((0.5 * log(p2.covars**2/p1.covars**2)) - 0.5 + p1.covars**2/(2*p2.covars**2) + (abs(p2.means - p1.means)**2) / (2*p2.covars**2)) mi@0: d = -2.0 * p1.means.shape[0] mi@0: d += sum( (p1.covars / p2.covars) + (p2.covars / p1.covars) + ((p1.means - p2.means) * ((1.0 / p1.covars) + (1.0 / p2.covars)) * (p1.means - p2.means)), dtype = np.float64 ) mi@0: if isnan(d) : mi@0: return np.finfo(np.float64).max mi@0: return d mi@0: mi@0: def skl_distance_diag(self,other): mi@0: '''Estimate the symmetrised Kullback–Leibler divergence between this and an other model.''' mi@0: # TODO: the components are sorted so this works but still theoretically not quite correct to do this... mi@0: assert (self.n_components == other.n_components), "Number of components must be equal." mi@0: kl12 = kl21 = 0.0 mi@0: for i in range(self.n_components) : mi@0: kl12 += self.weights[i] * self.kl_div(self.components[i],other.components[i]) mi@0: kl21 += other.weights[i] * self.kl_div(other.components[i],self.components[i]) mi@0: return (kl12 + kl21 ) / 2.0 mi@0: mi@0: def skl_distance_full(self,other): mi@0: return skl_models(self,other) mi@0: mi@0: def skl_distance_full_orig(self,other): mi@0: '''Estimate the symmetrised Kullback–Leibler divergence between this and an other model.''' mi@0: n = len(self.components) # number of components mi@0: d = len(self.components[0].means) # number of dimensions mi@0: mi@0: ixm = np.ones((n,1),dtype=int).T # selector of mean matrix components mi@0: ixd = range(0,d*d,d+1) # indices of diagonal elements of DxD matrix mi@0: t1 = self.covars.swapaxes(1,2).reshape(d,n*d) # concatenate gmm1 covariance matrices mi@0: t2 = other.covars.swapaxes(1,2).reshape(d,n*d) # concatenate gmm2 covariance matrices mi@0: loopn = xrange(n) mi@0: mi@0: logdet1 = np.zeros((n,1)) mi@0: kl11 = np.zeros((n,n)) mi@0: for i in loopn : mi@0: # step 1) precompute log(determinant()) of covariance matrices of gmm1 mi@0: logdet1[i] = log(det(self.covars.swapaxes(0,2)[:,:,i])) mi@0: mi@0: # step 2) compute reference kldiv between individual components of gmm1 mi@0: inv1 = inv(self.covars.swapaxes(0,2)[:,:,i]) mi@0: mm1 = self.means - self.means[i*ixm,:][0] mi@0: b1 = np.dot(inv1,t1).swapaxes(0,1).reshape(n,power(d,2)).T mi@0: kl11[:,i] = 0.5 * ( (logdet1[i]-d-logdet1)[:,0] + sum(b1[ixd,:],0).T + sum(np.dot(mm1,inv1) * mm1,1)) mi@0: # print kl11 mi@0: mi@0: logdet2 = np.zeros((n,1)) mi@0: kl22 = np.zeros((n,n)) mi@0: for i in loopn : mi@0: # step 3) precompute log(determinant()) of covariance matrices of gmm2 mi@0: logdet2[i] = log(det(other.covars.swapaxes(0,2)[:,:,i])) mi@0: inv2 = inv(other.covars.swapaxes(0,2)[:,:,i]) mi@0: mm2 = other.means - other.means[i*ixm,:][0] mi@0: b2 = np.dot(inv2,t2).swapaxes(0,1).reshape(n,power(d,2)).T mi@0: kl22[:,i] = 0.5 * ( (logdet2[i]-d-logdet2)[:,0] + sum(b2[ixd,:],0).T + sum(np.dot(mm2,inv2) * mm2,1)) mi@0: mi@0: # step 4) compute pair-wise kldiv between components of gmm1 and gmm2 mi@0: kl12 = np.zeros((n,n)) mi@0: kl21 = np.zeros((n,n)) mi@0: for i in loopn : mi@0: inv1 = inv(self.covars.swapaxes(0,2)[:,:,i]) mi@0: inv2 = inv(other.covars.swapaxes(0,2)[:,:,i]) mi@0: m12 = self.means - other.means[i*ixm,:][0] mi@0: m21 = other.means - self.means[i*ixm,:][0] mi@0: b1 = np.dot(inv1,t1).swapaxes(0,1).reshape(n,power(d,2)).T mi@0: b2 = np.dot(inv2,t2).swapaxes(0,1).reshape(n,power(d,2)).T mi@0: kl12[:,i] = 0.5 * ( (logdet2[i]-d-logdet1)[:,0] + sum(b2[ixd,:],0).T + sum(np.dot(m12,inv2) * m12,1)) mi@0: kl21[:,i] = 0.5 * ( (logdet1[i]-d-logdet2)[:,0] + sum(b1[ixd,:],0).T + sum(np.dot(m21,inv1) * m21,1)) mi@0: mi@0: # step 5) compute the final variational distance between gmm1 and gmm2 mi@0: kl_full_12 = np.dot(self.weights.T, (log(sum(exp(-kl11)*self.weights,1))) - log(sum(exp(-kl12)*other.weights,1))) mi@0: kl_full_21 = np.dot(other.weights.T, (log(sum(exp(-kl22)*other.weights,1))) - log(sum(exp(-kl21)*self.weights,1))) mi@0: return (kl_full_12 + kl_full_21 ) / 2.0 mi@0: mi@0: def emd(self, p1, p2): mi@0: '''Compute earth movers distance between component p1 and p2''' mi@0: mi@0: pass mi@0: mi@0: def c2(self, p1, p2): mi@0: d = power(det(p1.covars), -0.5) * power(det(p2.covars), -0.5) mi@0: return d mi@0: mi@0: def c2_distance(self, other): mi@0: '''Compute the c2 pdf distance metric mi@0: G. Sfikas etc. An analytic distance metric for Gaussian Mixture Models with application in image retrieval. ICANN 2005. mi@0: ''' mi@0: d12 = d11 = d22 = 0.0 mi@0: for i in xrange(self.n_components) : mi@0: for j in xrange(self.n_components) : mi@0: V = inv(inv(self.covars[i]) + inv(other.covars[j])) mi@0: K = self.means[i].T.dot(inv(self.covars[i])) * (self.means[i] - other.means[j]) + other.means[j].T.dot(inv(other.covars[j])) * (other.means[j] - self.means[i]) mi@0: mi@0: d12 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(self.components[i], other.components[j]))), 0.5)) mi@0: d11 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(self.components[i], self.components[j]))), 0.5)) mi@0: d22 += sum(self.weights[i] * other.weights[j] * power((det(V) / (exp(K) * self.c2(other.components[i], other.components[j]))), 0.5)) mi@0: mi@0: dist = -log(2 * d12 / (d11 + d22)) mi@0: mi@0: if isnan(dist) : mi@0: return np.finfo(np.float64).max mi@0: return dist mi@0: mi@0: def euclidean(self, p1, p2): mi@0: '''Compute euclidean distance between p1 and p2''' mi@0: if p1.means is None or p2.means is None: mi@0: return np.finfo(np.float64).max mi@0: d = sum(power(2 * pi * (power(p1.covars, 2) + power(p2.covars, 2)), -0.5) * exp(-0.5 * power(p1.means - p2.means, 2) / (power(p1.covars, 2) + power(p2.covars, 2)))) mi@0: mi@0: if isnan(d) : mi@0: return np.finfo(np.float64).max mi@0: return d mi@0: mi@0: def euclidean_distance(self, other): mi@0: '''Compute the pdf distance metric''' mi@0: e11 = e22 = e12 = 0.0 mi@0: for i in range(self.n_components) : mi@0: e11 += self.weights[i] * self.weights[i] * self.euclidean(self.components[i],self.components[i]) mi@0: e22 += other.weights[i] * other.weights[i] * self.euclidean(other.components[i],other.components[i]) mi@0: e12 += self.weights[i] * other.weights[i] * self.euclidean(self.components[i],other.components[i]) mi@0: mi@0: dist = e11 + e22 - 2 * e12 mi@0: mi@0: if isnan(dist) : mi@0: return np.finfo(np.float64).max mi@0: return dist mi@0: mi@0: def bhattacharyya(self, p1, p2): mi@0: '''Compute the Bhattacharyya based distance following: mi@0: K. Fukunaga. Introduction to statistical pattern recognition. Academic Press 1990 mi@0: ''' mi@0: B_dev = 0.125 * ((p1.means - p2.means).T.dot( inv((p1.covars + p2.covars) / 2))).dot(p1.means - p2.means)\ mi@0: + 0.5 * log( abs(inv((p1.covars + p2.covars) / 2)) / power(abs(inv(p1.covars) * inv(p2.covars)), 0.5) ) mi@0: mi@0: d = sum(B_dev) mi@0: if isnan(d) : mi@0: return np.finfo(np.float64).max mi@0: return d mi@0: mi@0: def bhattacharyya_distance(self, other): mi@0: '''Compute the pdf distance metric''' mi@0: dist = 0.0 mi@0: for i in xrange(self.n_components) : mi@0: dist += self.weights[i] * other.weights[i] *self.bhattacharyya(self.components[i], other.components[i]) mi@0: return dist mi@0: mi@0: mi@0: class AudioGenerator(object): mi@0: '''Generate simple audio data (sinusoidal etc. and noise and their combinations).''' mi@0: mi@0: def sine(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02): mi@0: n = int(length * framerate) mi@0: return np.array([amplitude * math.sin(2.0*math.pi * float(freq) * (float(i)/float(framerate))) for i in xrange(n)]) mi@0: mi@0: def noise(self, framerate=44100, amplitude=0.1, length=0.02): mi@0: n = int(length * framerate) mi@0: return np.array([float(amplitude) * np.random.uniform(-1, 1) for i in xrange(n)]) mi@0: mi@0: def square(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02): mi@0: for s in self.sine(freq, framerate, amplitude, length): mi@0: if s > 0: mi@0: yield amplitude mi@0: elif s < 0: mi@0: yield -amplitude mi@0: else: mi@0: yield 0.0 mi@0: mi@0: def damped(self, freq=440.0, framerate=44100, amplitude=0.5, length=0.02): mi@0: n = int(length*framerate) mi@0: return (exp(-(float(i%n)/float(framerate))) * s for i, s in enumerate(self.sine(frequency, framerate, amplitude, length))) mi@0: mi@0: def write_wav_file(self, samples,filename) : mi@0: '''Create a wav file and write it to disk.''' mi@0: nchannels, sampwidth, framerate, nframes = 1, 2, 44100, len(samples) mi@0: max_amplitude = 32767.0 mi@0: w = wave.open(filename, 'w') mi@0: w.setparams((nchannels, sampwidth, framerate, nframes, 'NONE', 'not compressed')) mi@0: frames = str().join(struct.pack('h', int(max_amplitude * s)) for s in samples) mi@0: w.writeframesraw(frames) mi@0: w.close() mi@0: print "wav file: %s written." %filename mi@0: mi@0: class FeatureExtractor(object): mi@0: '''Extract low level features of input audio samples for compare distance computation.''' mi@0: def main(): mi@0: mi@0: # ag = AudioGenerator() mi@0: # fe = FeatureExtractor() mi@0: # mi@0: # # Create a "test1.wav" file which is 10s length and contains two sinusoids + noise mi@0: # length = 10.0 # unit = sG mi@0: # samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.noise(length=length) mi@0: # samples = samples / (max(samples) + 0.05) mi@0: # ag.write_wav_file(samples, "audio/test1.wav") mi@0: # # Create a file indentical to 'test1.wav mi@0: # ag.write_wav_file(samples,"audio/test1a.wav") mi@0: # # Create a file with the same componentes as "test1.wav" but the freq of one sinusouid different mi@0: # samples = ag.sine(440.0,length=length) + ag.sine(480.0,length=length) + ag.noise(length=length) mi@0: # samples = samples / (max(samples) + 0.05) mi@0: # ag.write_wav_file(samples, "audio/test1b.wav") mi@0: # # Create a file with the same componentes as "test1.wav" but the freq of both sinusouids different mi@0: # samples = ag.sine(880.0,length=length) + ag.sine(480.0,length=length) + ag.noise(length=length) mi@0: # samples = samples / (max(samples) + 0.05) mi@0: # ag.write_wav_file(samples, "audio/test1c.wav") mi@0: # mi@0: # # Create a file with one more sinusouid componentes as "test1.wav" mi@0: # samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.sine(880.0,length=length) + ag.noise(length=length) mi@0: # samples = samples / (max(samples)+0.05) mi@0: # ag.write_wav_file(samples, "audio/test2a.wav") mi@0: # # Create a file with changed freq and one more sinusouid componentes as "test1.wav" mi@0: # samples = ag.sine(440.0,length=length) + ag.sine(240.0,length=length) + ag.sine(1320.0,length=length) + ag.noise(length=length) mi@0: # samples = samples / (max(samples)+0.05) mi@0: # ag.write_wav_file(samples, "audio/test2b.wav") mi@0: # mi@0: # # Create a file with longer length than "test1.wav" mi@0: # samples = ag.sine(440.0,length=15) + ag.sine(240.0,length=15) + ag.noise(length=15) mi@0: # samples = samples / (max(samples)+0.05) mi@0: # ag.write_wav_file(samples, "audio/test3a.wav") mi@0: # # Create a file with longer length and one more sinusoid than "test1.wav" mi@0: # samples = ag.sine(440.0,length=15) + ag.sine(240.0,length=15) + ag.sine(880.0,length=15) + ag.noise(length=15) mi@0: # samples = samples / (max(samples)+0.05) mi@0: # ag.write_wav_file(samples, "audio/test3b.wav") mi@0: mi@0: mi@0: # plt.plot(samples[:1000]) mi@0: # plt.show() mi@0: mi@0: mi@0: # print "Testing Gaussian Feature: Generating random features." mi@0: feature_array_1 = np.zeros((20,100),dtype=np.float64) mi@0: feature_array_2 = np.zeros((20,100),dtype=np.float64) mi@0: mi@0: r = rand(2) mi@0: mi@0: for x in xrange(100) : mi@0: feature_array_1[:,x] = rand(20) + r[0] mi@0: feature_array_2[:,x] = rand(20) + r[1] mi@0: mi@0: f1 = GmmDistance(feature_array_1) mi@0: f2 = GmmDistance(feature_array_2) mi@0: mi@0: print f1,f2,"\n" mi@0: mi@0: print "KL2 distance between f1-f2 using diag covariance:", f1.skl_distance_diag(f2) mi@0: print "KL2 distance between f1-f1 using diag covariance:", f1.skl_distance_diag(f1) mi@0: print "KL2 distance between f2-f2 using diag covariance:", f2.skl_distance_diag(f2) mi@0: print "KL2 distance between f2-f1 using diag covariance:", f2.skl_distance_diag(f1) mi@0: print '\n' mi@0: print "KL2 distance between f1-f2 using full covariance:", f1.skl_distance_full(f2) mi@0: print "KL2 distance between f1-f1 using full covariance:", f1.skl_distance_full(f1) mi@0: print "KL2 distance between f2-f2 using full covariance:", f2.skl_distance_full(f2) mi@0: print "KL2 distance between f2-f1 using full covariance:", f2.skl_distance_full(f1) mi@0: print '\n' mi@0: print "c2 distance between f1-f2:", f1.c2_distance(f2) mi@0: print "c2 distance between f1-f1:", f1.c2_distance(f1) mi@0: print "c2 distance between f2-f2:", f2.c2_distance(f2) mi@0: print "c2 distance between f2-f1:", f2.c2_distance(f1) mi@0: print '\n' mi@0: print "euclidean distance between f1-f2:", f1.euclidean_distance(f2) mi@0: print "euclidean distance between f1-f1:", f1.euclidean_distance(f1) mi@0: print "euclidean distance between f2-f2:", f2.euclidean_distance(f2) mi@0: print "euclidean distance between f2-f1:", f2.euclidean_distance(f1) mi@0: print '\n' mi@0: print "bhattacharyya distance between f1-f2:", f1.bhattacharyya_distance(f2) mi@0: print "bhattacharyya distance between f1-f1:", f1.bhattacharyya_distance(f1) mi@0: print "bhattacharyya distance between f2-f2:", f2.bhattacharyya_distance(f2) mi@0: print "bhattacharyya distance between f2-f1:", f2.bhattacharyya_distance(f1) mi@0: mi@0: # ag = AudioGenerator() mi@0: # sound = ag.synthesisor(nChannels = 2, framerate=44100, amplitude=0.5, length=0.02) mi@0: # ag.write_wav_file(samples=sound, nframes=None, length=0.02, filename='audio/test.wav') mi@0: # mi@0: # l = dir(__builtins__) mi@0: # d = __builtins__.__dict__ mi@0: # pprint(l) mi@0: # pprint(d) mi@0: mi@0: # # Load audio data mi@0: # audio_files = [i for i in os.listdir(audioDir) if i.endswith('wav') and not i.startswith('.')] mi@0: # audio_list = [] mi@0: # for i in audio_files: mi@0: # ao = AudioObj() mi@0: # ao.name = splitext(i)[0] mi@0: # ao.data = np.array(read(join(audioDir, i))[1], dtype=float) mi@0: # audio_list.append(ao) mi@0: # mi@0: # # Calculate pairwise ditances between audio data using listed metrics mi@0: # res_list = [] mi@0: # for a1, a2 in combinations(audio_list, 2): mi@0: # # basic info mi@0: # res = DistanceObj() mi@0: # f1 = GmmDistance(a1.data) mi@0: # f2 = GmmDistance(a2.data) mi@0: # res.audio1 = a1.name mi@0: # res.audio2 = a2.name mi@0: # res.nComponents = len(f1.components) mi@0: # mi@0: # # distances between two samples should be symmetric mi@0: # res.skl_full = f1.skl_distance_full(f2) mi@0: # res.skl_diag = f1.skl_distance_diag(f2) mi@0: # res.c2 = f1.c2_distance(f2) mi@0: # res.euclidean = f1.euclidean_distance(f2) mi@0: # res.bhattacharyya = f1.bhattacharyya_distance(f2) mi@0: # mi@0: # res.save() mi@0: # res_list.append(res) mi@0: mi@0: mi@0: mi@0: if __name__ == '__main__': mi@0: main() mi@0: mi@0: mi@0: mi@0: