Mercurial > hg > chourdakisreiss2016
view experiment-reverb/code/#training.py# @ 2:c87a9505f294 tip
Added LICENSE for code, removed .wav files
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Sat, 30 Sep 2017 13:25:50 +0100 |
parents | 246d5546657c |
children |
line wrap: on
line source
#!/usr/bin/python2 # -*- coding: utf-8 -*- """ Created on Thu Apr 23 11:53:17 2015 @author: mmxgn """ # This file does the cluster estimation and the removal of outliers from warnings import filterwarnings from sys import argv, exit from essentia.standard import YamlInput, YamlOutput from essentia import Pool from pca import * from numpy import * from sklearn import cluster from sklearn.metrics import pairwise_distances from sklearn.cluster import KMeans, MiniBatchKMeans from matplotlib.pyplot import * #from sklearn.mixture import GMM from sklearn.naive_bayes import GaussianNB, MultinomialNB from scipy.signal import decimate from sklearn import cross_validation from sklearn import multiclass from sklearn.covariance import EllipticEnvelope #from hmmlearn import hmm from hmmlearn.hmm import GMM from hmmlearn import hmm from sklearn import svm import copy as cp from scipy.misc import logsumexp import pickle from collections import Counter #from adpcm import adm, adm_reconstruct from reshape_observations import reshape_observations mse = lambda A,B: ((array(A)-array(B)) ** 2).mean() lognorm = lambda A: A - logsumexp(A) rc('font',**{'family':'sans-serif','sans-serif':['Helvetica']}) ## for Palatino and other serif fonts use: #rc('font',**{'family':'serif','serif':['Palatino']}) rc('text', usetex=True) #logsum = lambda X: logsumexp(log(X)) filterwarnings("ignore") C = 0.7 NCOMPONENTS=2 CHAINSIZE = 5 close('all') from numpy.random import randint class HMMsvm: def __init__(self, svm_): self.svm = svm_ # self.svm = MyAVAClassifier() def fit(self, features_list, states, flr=1e-13): # def fit(self, features_list, flr=1e-13): # TODO: Concatenate features, train #self.svm.fit(X, states, flr) #self.prior = self.svm.prior #self.transmat = self.svm.transmat features = None for f in features_list: if features is None: features = f else: features = append(features, f, 0) fullstates = None # T = features.shape[0] for s in states: if fullstates is None: fullstates = s else: fullstates = append(fullstates, s) self.n_classes = self.svm.n_classes n_classes = self.n_classes # print fullstates, shape(fullstates) h = histogram(fullstates, n_classes)[0].astype(float) self.prior = h/sum(h) # Add a very small probability for random jump self.prior += flr self.prior = self.prior/sum(self.prior) transmat = zeros((n_classes, n_classes)) transitions = zeros((n_classes, )) for s in states: for i in range(1, len(s)): prev = s[i-1] cur = s[i] transmat[prev,cur] += 1 transitions[prev] += 1 transitions[transitions == 0] = 1 for i in range(0, transmat.shape[0]): transmat[i,:] = transmat[i,:]/transitions[i] self.transmat = transmat # Add a very small probability for random jump to avoid zero values self.transmat += flr for i in range(0, self.transmat.shape[0]): self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:]) A = zeros((n_classes, n_classes)) Aold = self.transmat while mse(A,Aold)>10*size(A)*flr: Aold = A.copy() A = zeros((n_classes, n_classes)) transitions = zeros((n_classes, )) for i in range(0,len(features_list)): f = features_list[i] # print "FEATURES:", f # print f s,p = self.logviterbi(f) # print s for j in range(1, len(s)): prev = s[j-1] cur = s[j] A[prev,cur] += 1 transitions[prev] += 1 transitions[transitions == 0] = 1 for i in range(0, A.shape[0]): A[i,:] = A[i,:]/transitions[i] A += flr self.transmat = A for i in range(0, A.shape[0]): if sum(A[i,:]) > 0: A[i,:] = A[i,:]/sum(A[i,:]) # print observations def estimate_emission_probability(self, x, q): post = self.svm.estimate_posterior_probability_vector(x) # print "post: ", post prior = self.prior # print "prior: ", prior p = post/prior p = p/sum(p) return p[q] # def estimate_emission_probability(self, x, q): # return self.svm.estimate_emission_probability(q, x) def predict(self, X): return self.logviterbi(X)[0] def logviterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initalization a1 = self.prior delta = zeros((T,N)) psi = zeros((T,N)) for i in range(0, N): delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i)) #2. Recursion for t in range(1, T): # delta_old = delta.copy() x = X[t, :] for j in range(0, N): # print "j: %d, S" % j, delta_old+log(transmat[:,j]) delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j)) psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j])) # print "t: %d, delta: " % t, delta # print "t: %d, psi:" % t, psi # 3. Termination p_star = max(delta[T-1,:] + log(transmat[:,N-1])) q_star = argmax(delta[T-1,:] + log(transmat[:, N-1])) # 4. Backtracking q = zeros((T,)) p = zeros((T,)) q[-1] = q_star p[-1] = p_star for t in range(1, T): q[-t-1] = psi[-t, q[-t]] p[-t-1] = delta[-t, q[-t]] return q,p def viterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initialization a1 = self.prior delta = zeros((N, )) psi = zeros((N, )) for i in range(0, N): delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i) #2. Recursion for t in range(1, T): delta_old = delta.copy() x = X[t,:] for j in range(0, N): delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j) psi[j] = argmax(delta_old*transmat[:,j]) #3. Termination p_star = max(delta*transmat[:,N-1]) q_star = argmax(delta*transmat[:,N-1]) #4. Backtracking q = zeros((T,)) q[-1] = q_star for t in range(1, T): q[-t-1] = psi[q[-t]] return q def _log_likelihood_matrix(self, X): N = self.n_classes ll = zeros((X.shape[0], N)) for i in range(0, X.shape[0]): ll[i,:] = self._forward(i, X) return ll def compute_ksus(self, X): N = self.n_classes T = X.shape[0] print "[II] Computing gammas... " gammas = self._forward_backward(X) # Initialize aij = self.transmat def _not_quite_ksu(self, t, i, j, X): alpha = exp(self.forward(X[0:t+1,:]))[i] beta = exp(self.backward(X[t+1:,:]))[j] aij = self.transmat[i,j] bj = self.estimate_emission_probability(X[t+1,:], j) return alpha*beta*aij*bj def _ksu(self, t, i, j, X): alphaT = exp(self.forward(X[0:t+1,:]))[0] return self._not_quite_ksu(t,i,j,X) def _forward_backward(self, X): T = X.shape[0] N = self.n_classes fv = zeros((T, N)) sv = zeros((T, N)) b = None for t in range(1, T+1): fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ]) for t in range(1, T+1): b = self._backward_message(b, X[-t: , :]) sv[-t,:] = lognorm(fv[t-1,:]*b) return sv def _backward(self, t, X): N = self.n_classes a = ones((N,))/N beta = zeros((N, )) transmat = self.transmat T = X.shape[0] x = X[t, :] if t == T-1: beta = log(a) return beta else: tosum = zeros((N, )) bt = self._backward(t+1, X) btnew = zeros((N, )) # print bt for j in range(0, N): for i in range(0, N): # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j]) tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j]) # print tosum btnew[j] = logsumexp(tosum) btnew = lognorm(btnew) return btnew def score(self, X): return self.forward(X) def forward(self, X): T = X.shape[0] f = None for t in range(1, T+1): f = self._forward_message(f, X[0:t, :]) return f def backward(self, X): T = X.shape[0] b = None for t in range(1,T+1): # print t # print t,b idx = arange(-t,T) # print idx b = self._backward_message(b, X[-t:, :]) return b def _backward_message(self, b, X): N = self.n_classes beta = zeros((N, )) transmat = self.transmat x = X[0, :] if X.shape[0] == 1: beta = log(ones((N,))/N) return beta else: tosum = zeros((N, )) btnew = zeros((N, )) for j in range(0, N): for i in range(0, N): tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j]) btnew[j] = logsumexp(tosum) # btnew = lognorm(btnew) return btnew def _forward_message(self, f, X): N = self.n_classes a = self.prior alpha = zeros((N, )) transmat = self.transmat x = X[-1, :] if X.shape[0] == 1: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # alpha = lognorm(alpha) return alpha else: tosum = zeros((N,)) ftnew = zeros((N,)) for j in range(0, N): for i in range(0, N): tosum[i] = f[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) # ftnew = lognorm(ftnew) return ftnew def _forward(self, t, X): N = self.n_classes a = self.prior # print a alpha = zeros((N, )) transmat = self.transmat x = X[t, :] if t == 0: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # print "--" # print alpha alpha = lognorm(alpha) # print alpha # print "xx" return alpha else: tosum = zeros((N, )) ft = self._forward(t-1, X) ftnew = zeros((N, )) for j in range(0, N): for i in range(0, N): # print ft tosum[i] = ft[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) ftnew = lognorm(ftnew) return ftnew class HMMsvm2: def __init__(self, svm_, n_components=4): self.svm = svm_ self.n_components = n_components # self.svm = MyAVAClassifier() def fit(self, features_list, flr=1e-13): # def fit(self, features_list, flr=1e-13): # TODO: Concatenate features, train #self.svm.fit(X, states, flr) #self.prior = self.svm.prior #self.transmat = self.svm.transmat n_classes = self.n_components self.n_classes = n_classes A = zeros((n_classes, n_classes)) Aold = randn(n_classes, n_classes) self.transmat = Aold self.prior = randn(n_classes) self.prior = self.prior/sum(self.prior) for i in range(0, Aold.shape[0]): Aold[i,:] = Aold[i,:]/sum(Aold[i,:]) while mse(A,Aold)>10*size(A)*flr: Aold = A.copy() A = zeros((n_classes, n_classes)) transitions = zeros((n_classes, )) for i in range(0,len(features_list)): f = features_list[i] # print "FEATURES:", f # print f s,p = self.logviterbi(f) # print s h = histogram(s, n_classes)[0].astype(float) self.prior = h/sum(h) self.prior += flr self.prior = self.prior/sum(self.prior) for j in range(1, len(s)): prev = s[j-1] cur = s[j] A[prev,cur] += 1 transitions[prev] += 1 transitions[transitions == 0] = 1 for i in range(0, A.shape[0]): A[i,:] = A[i,:]/transitions[i] A += flr self.transmat = A for i in range(0, A.shape[0]): if sum(A[i,:]) > 0: A[i,:] = A[i,:]/sum(A[i,:]) # print observations def estimate_emission_probability(self, x, q): post = self.svm.estimate_posterior_probability_vector(x) # print "post: ", post prior = self.prior # print "prior: ", prior p = post/prior p = p/sum(p) return p[q] # def estimate_emission_probability(self, x, q): # return self.svm.estimate_emission_probability(q, x) def predict(self, X): return self.logviterbi(X)[0] def logviterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initalization # a1 = self.prior delta = zeros((T,N)) psi = zeros((T,N)) for i in range(0, N): delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i)) #2. Recursion for t in range(1, T): # delta_old = delta.copy() x = X[t, :] for j in range(0, N): # print "j: %d, S" % j, delta_old+log(transmat[:,j]) delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j)) psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j])) # print "t: %d, delta: " % t, delta # print "t: %d, psi:" % t, psi # 3. Termination p_star = max(delta[T-1,:] + log(transmat[:,N-1])) q_star = argmax(delta[T-1,:] + log(transmat[:, N-1])) # 4. Backtracking q = zeros((T,)) p = zeros((T,)) q[-1] = q_star p[-1] = p_star for t in range(1, T): q[-t-1] = psi[-t, q[-t]] p[-t-1] = delta[-t, q[-t]] return q,p def viterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initialization a1 = self.prior delta = zeros((N, )) psi = zeros((N, )) for i in range(0, N): delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i) #2. Recursion for t in range(1, T): delta_old = delta.copy() x = X[t,:] for j in range(0, N): delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j) psi[j] = argmax(delta_old*transmat[:,j]) #3. Termination p_star = max(delta*transmat[:,N-1]) q_star = argmax(delta*transmat[:,N-1]) #4. Backtracking q = zeros((T,)) q[-1] = q_star for t in range(1, T): q[-t-1] = psi[q[-t]] return q def _log_likelihood_matrix(self, X): N = self.n_classes ll = zeros((X.shape[0], N)) for i in range(0, X.shape[0]): ll[i,:] = self._forward(i, X) return ll def compute_ksus(self, X): N = self.n_classes T = X.shape[0] print "[II] Computing gammas... " gammas = self._forward_backward(X) # Initialize aij = self.transmat def _not_quite_ksu(self, t, i, j, X): alpha = exp(self.forward(X[0:t+1,:]))[i] beta = exp(self.backward(X[t+1:,:]))[j] aij = self.transmat[i,j] bj = self.estimate_emission_probability(X[t+1,:], j) return alpha*beta*aij*bj def _ksu(self, t, i, j, X): alphaT = exp(self.forward(X[0:t+1,:]))[0] return self._not_quite_ksu(t,i,j,X) def _forward_backward(self, X): T = X.shape[0] N = self.n_classes fv = zeros((T, N)) sv = zeros((T, N)) b = None for t in range(1, T+1): fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ]) for t in range(1, T+1): b = self._backward_message(b, X[-t: , :]) sv[-t,:] = lognorm(fv[t-1,:]*b) return sv def _backward(self, t, X): N = self.n_classes a = ones((N,))/N beta = zeros((N, )) transmat = self.transmat T = X.shape[0] x = X[t, :] if t == T-1: beta = log(a) return beta else: tosum = zeros((N, )) bt = self._backward(t+1, X) btnew = zeros((N, )) # print bt for j in range(0, N): for i in range(0, N): # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j]) tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j]) # print tosum btnew[j] = logsumexp(tosum) btnew = lognorm(btnew) return btnew def score(self, X): return self.forward(X) def forward(self, X): T = X.shape[0] f = None for t in range(1, T+1): f = self._forward_message(f, X[0:t, :]) return f def backward(self, X): T = X.shape[0] b = None for t in range(1,T+1): # print t # print t,b idx = arange(-t,T) # print idx b = self._backward_message(b, X[-t:, :]) return b def _backward_message(self, b, X): N = self.n_classes beta = zeros((N, )) transmat = self.transmat x = X[0, :] if X.shape[0] == 1: beta = log(ones((N,))/N) return beta else: tosum = zeros((N, )) btnew = zeros((N, )) for j in range(0, N): for i in range(0, N): tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j]) btnew[j] = logsumexp(tosum) # btnew = lognorm(btnew) return btnew def _forward_message(self, f, X): N = self.n_classes a = self.prior alpha = zeros((N, )) transmat = self.transmat x = X[-1, :] if X.shape[0] == 1: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # alpha = lognorm(alpha) return alpha else: tosum = zeros((N,)) ftnew = zeros((N,)) for j in range(0, N): for i in range(0, N): tosum[i] = f[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) # ftnew = lognorm(ftnew) return ftnew def _forward(self, t, X): N = self.n_classes a = self.prior # print a alpha = zeros((N, )) transmat = self.transmat x = X[t, :] if t == 0: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # print "--" # print alpha alpha = lognorm(alpha) # print alpha # print "xx" return alpha else: tosum = zeros((N, )) ft = self._forward(t-1, X) ftnew = zeros((N, )) for j in range(0, N): for i in range(0, N): # print ft tosum[i] = ft[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) ftnew = lognorm(ftnew) return ftnew class NBClassifier: def __init__(self): print "[II] Gaussian Naive Bayes Classifier" self.name = "Naive Bayes" self.smallname = "gnbc" self.gnb = GaussianNB() def getName(self): return self.name def fit(self, X, states): self.gnb.fit(X, states) def predict(self, X): return self.gnb.predict(X) def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for NB Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HmmClassifier3: def __init__(self, N=2,n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d time steps, %d components)" % (N, n_components) self.n_components = n_components self.n_components = n_components self.smallname = "hmmc" self.chain_size = N def getName(self): return self.name def fit(self, features, parameters): # print "Parameters:" # print parameters n_classes = max(unique(parameters)) + 1 if n_classes == 1: self.only_one_class = True return else: self.only_one_class = False print "[II] Number of classes: ", n_classes hmms = [None]*n_classes chain_size = self.chain_size obs = [None]*n_classes for i in range(chain_size, len(parameters)): class_ = parameters[i] seq = features[i-chain_size:i,:] if obs[class_] is None: obs[class_] = [seq] else: obs[class_].append(seq) for i in range(0, len(obs)): if obs[i] is not None and len(obs[i]) != 0: hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='diag') # print(obs[i]) #obs_ = reshape_observations(obs[i]) obs_ = concatenate(obs[i]) #lengths_ = [o.shape[0] for o in obs[i]] #print (lengths_) # print len(obs[i]) # print obs[i][0][0].shape # print obs[i] # for s in obs[i]: # if s.ndim > 2: # print s hmm_.fit(obs_, [self.chain_size]*len(obs[i])) hmms[i] = hmm_ self.hmms = hmms return obs def predict(self, features, mfilt=20): if self.only_one_class == True: return zeros((features.shape[0], )) chain_size = self.chain_size hmms = self.hmms predicted_classes = zeros((features.shape[0],)) for i in range(chain_size, features.shape[0]): scores = zeros((len(hmms),)) seq = features[i-chain_size:i, :] for j in range(0, len(hmms)): if hmms[j] is not None: scores[j] = hmms[j].score(seq) else: scores[j] = -infty predicted_classes[i] = argmax(scores) # Do a median filter at the end # predicted_classes = median_filter(predicted_classes,mfilt) return predicted_classes def k_fold_cross_validate(self, data, classes, K=5): print "[II] HMM Classifier Crossvalidating... " print "[II] Validating data: ", data print "[II] With classes: ", classes from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) labels = estimator_fulldata.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(classes)) # Crossvalidate this. example_sequences = [] example_labels = [] chain_size = self.chain_size percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] # Constructing examples and labels L = data.shape[0]/K M = K # print "[II] Example size:", L n = 1 while L*(n-1) < M*L: if L*n > shape(data)[0]: example = data[L*(n-1):,:] classes_ = classes[L*(n-1):] else: example = data[L*(n-1):L*n, :] classes_ = classes[L*(n-1):L*n] # print example # print classes_ example_sequences.append(example) example_labels.append(classes_) n+=1 # print len(example_sequences) # print len(example_labels) from sklearn.cross_validation import KFold kf = KFold(K, n_folds=K) ratio = 0 for train, test in kf: print "[II] Validating examples %s against %s." % (train, test) topredict_data = example_sequences[test[0]] topredict_labels = example_labels[test[0]] tofit_data = None tofit_labels = None for t in train: # print t if tofit_data is None: tofit_data = example_sequences[t] tofit_labels = example_labels[t] else: tofit_data = append(tofit_data, example_sequences[t], 0) tofit_labels = append(tofit_labels, example_labels[t], 0) estimator_ = deepcopy(estimator) estimator_.fit(tofit_data, tofit_labels) labels = estimator_.predict(topredict_data) m = sum(array(topredict_labels==labels).astype(float))/len(labels) print "[II] %d-fold cross-validation for HMM Classifier: %.1f" % (K, m) ratio += m/float(len(kf)) return ratio # Splitting to train/test def cross_validate(self, data, classes): print "[II] HMM Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) labels = estimator_fulldata.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(classes)) # Crossvalidate this. example_sequences = [] example_labels = [] chain_size = self.chain_size percents = arange(0.5,0.9,0.1) self.percents = percents * 100 RATIOS = [] # Constructing examples and labels M = self.chain_size L = data.shape[0]/20 print "[II] Example size:", L n = 1 while L*n < M*L-L: example = data[L*(n-1):L*n, :] # print example classes_ = classes[L*(n-1):L*n] # print classes_ example_sequences.append(example) example_labels.append(classes_) n+=1 # Splitting to train/test for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(example_sequences, example_labels, test_size=p, random_state=1) print traindata print testdata # Concatenate traindata tofit_data = None tofit_labels = None topredict_data = None topredict_labels = None for t in traindata: if tofit_data is None: tofit_data = t else: tofit_data = append(tofit_data, t, 0) for l in trainlabels: if tofit_labels is None: tofit_labels = l else: tofit_labels = append(tofit_labels, l) for t in testdata: if topredict_data is None: topredict_data = t else: topredict_data = append(topredict_data, t, 0) for l in testlabels: if topredict_labels is None: topredict_labels = l else: topredict_labels = append(topredict_labels, l) # print "[II] Fit data: ", tofit_data ## print "[II] Fit labels: ", tofit_labels # print "[II] Predict data: ", topredict_data # print "[II] Predict labels: ", topredict_labels # estimator_ = deepcopy(estimator) print tofit_labels estimator_.fit(tofit_data, tofit_labels) labels = estimator_.predict(topredict_data) m = sum(array(topredict_labels==labels).astype(float))/len(labels) print "[II] [%.2f-%.2f] cross-validation for HMM Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HMMsvmClassifier2: def __init__(self, N=2): self.classifiers = {} self.name = "HMM-SVM Classifier 2" self.smallname = "hmmsvm2" self.obs = MyAVAClassifier() self.chain_size = N def getName(self): return self.name def fit(self, X, y): self.n_classes = max(unique(y))+1 n_classes = self.n_classes chain_size = self.chain_size if n_classes == 1: self.only_one_class = True return else: self.only_one_class = False print "[II] Number of classes: ", n_classes hmms = [None]*n_classes obs = [None]*n_classes for i in range(chain_size, len(parameters)): class_ = parameters[i-1] class HMMsvmClassifier: def __init__(self, N=2): self.classifiers = {} self.name = "HMM-SVM Classifier" self.smallname = "hmmsvm" self.obs = MyAVAClassifier() self.chain_size = N def getName(self): return self.name def fit(self, X, y): self.n_classes = max(unique(y))+1 self.obs.fit(X,y) self.hmm = HMMsvm(self.obs) self.hmm.fit([X],[y]) def predict(self, X): # chain_size = self.chain_size # predicted = zeros((X.shape[0]),) # for i in range(chain_size, len(predicted)): # predicted[i] = self.hmm.predict(X[i-chain_size:i,:])[-1] # return predicted return self.hmm.predict(X) def confidence(self, x, q): return self.hmm.estimate_emission_probability(x, q) class SinkHoleClassifier: def __init__(self): self.classifierNB = NBClassifier() self.classifierSVM = MyAVAClassifier() self.classifierHMM = HmmClassifier3() self.classifierHMMSVM = HMMsvmClassifier() def getName(self): return self.name def fit(self, X, y): self.n_classes = max(unique(y))+1 self.classifierNB.fit(X,y) self.classifierSVM.fit(X,y) self.classifierHMM.fit(X,y) self.classifierHMMSVM.fit(X,y) def predict(self, X): predictedNB = self.classifierNB.predict(X) predictedSVM = self.classifierSVM.predict(X) predictedHMM = self.classifierHMM.predict(X) predictedHMMSVM = self.classifierHMMSVM.predict(X) predicted = zeros((X.shape[0], )) for i in range(0, len(predicted)): candidates = [predictedNB[i], predictedSVM[i], predictedHMM[i], predictedHMMSVM[i]] c = Counter(candidates) most_common = c.most_common() # If there is an equal voting, select something at random if len(unique([k[1] for k in most_common])) == 1: predicted[i] = most_common[randint(len(most_common))][0] else: predicted[i] = most_common[0][0] return predicted class MyAVAClassifier: def __init__(self): self.classifiers = {} self.name = "Linear SVM Classifier" self.smallname = "svc-ava" # self.probabilities = {} def getName(self): return self.name def fit(self, X, y, flr = 0): n_classes = max(unique(y)) + 1 # Transform to a binary if there are only two classes if len(unique(y)) == 1: self.only_one_class = True self.n_classes = 1 self.one_class_label = y[0] return elif len(unique(y)) == 2: self.n_classes = n_classes self.svm = svm.SVC(probability = True, kernel='poly', gamma=2, C = C) self.svm.fit(X,y) classes_ = unique(y) self.classifiers[(classes_[0], classes_[1])] = self.svm self.only_two_classes = True self.only_one_class = False return else: self.only_two_classes = False self.only_one_class = False classes = arange(0, n_classes) self.n_classes = n_classes # M = n_classes*(n_classes-1) # Number of classifiers h = histogram(y, n_classes)[0].astype(float) self.prior = h/sum(h) transmat = zeros((n_classes, n_classes)) for i in range(1, len(y)): prev = y[i-1] cur = y[i] transmat[prev,cur] += 1 transmat = transmat/sum(transmat) self.transmat = transmat # Add a very small probability for random jump to avoid zero values self.transmat += flr self.transmat = self.transmat/sum(self.transmat) for i in range(0, n_classes): for j in range(0, n_classes): if i != j and (i,j) not in self.classifiers and (j,i) not in self.classifiers: # print (i,j) # print classes idx_ = bitwise_or(y == classes[i], y == classes[j]) # idx_ = bitwise_or(y == i, y == j) X_ = X[idx_, :] y_ = y[idx_] #print y_ if len(unique(y_)) > 1: svm_ = svm.SVC(probability = True, kernel='poly', gamma=2, C = C) # print y_ svm_.fit(X_, y_) self.classifiers[(i,j)] = svm_ # self.probabilities[(i,j)] = svm_.predict_proba(X) def estimate_pairwise_class_probability(self, i, j, x): # print self.classifiers.keys() if (i,j) not in self.classifiers and (j,i) in self.classifiers: return self.classifiers[(j,i)].predict_proba(x)[0,1] elif (i,j) not in self.classifiers and (j,i) not in self.classifiers: return 0.0 else: return self.classifiers[(i,j)].predict_proba(x)[0,0] def estimate_posterior_probability(self, i, x): mus = zeros((self.n_classes,)) for j in range(0, self.n_classes): if i != j: pcp = self.estimate_pairwise_class_probability(i,j,x) pcp += 1e-18 mus[j] = 1/pcp # print mus S = sum(mus) - (self.n_classes - 2) # print S return 1/S def estimate_posterior_probability_vector(self, x): posterior = zeros((self.n_classes,)) for i in range(0, len(posterior)): posterior[i] = self.estimate_posterior_probability(i, x) return posterior # def estimate_emission_probability(self, i, x): # post = self.estimate_posterior_probability_vector(x) # # print "post: ", post # prior = self.prior # # print "prior: ", prior # p = post/prior # p = p/sum(p) # # return p[i] # def estimate_emissmat(self, X): # emissmat = zeros((X.shape[0], self.n_classes)) # for i in range(0, X.shape[0]): # for j in range(0, self.n_classes): # emissmat[i,j] = self.estimate_emission_probability(j, X[i,:]) # # return emissmat def predict(self, X): predicted = zeros((X.shape[0],)) if self.only_one_class == True: return ones((X.shape[0],))*self.one_class_label elif self.only_two_classes == True: return self.svm.predict(X) for i in range(0, X.shape[0]): x = X[i,:] P = zeros((self.n_classes,)) for c in range(0, len(P)): P[c] = self.estimate_posterior_probability(c, x) pred = argmax(P) predicted[i] = pred return predicted def logsum(X): if len(X) == 1: return log(X[0]) else: return logaddexp(log(X[0]),logsum(X[1:])) def smooth_matrix_1D(X): window = scipy.signal.gaussian(51,4) window = window/sum(window) intermx = zeros((X.shape[0],X.shape[1]+100)) intermx[:, 50:-50] = X for m in range(0, X.shape[0]): # print intermx.shape intermx[m,:] = convolve(ravel(intermx[m,:]), window,'same') return intermx[:,50:-50] def adm_reconstruct(codeword, h, dmin=.01, dmax=.28): x = zeros((1, codeword.shape[1])) delta1 = dmin delta2 = dmin Sum = h x[0] = h for i in range(0, codeword.shape[1]): if codeword[0,i] == 0: delta1 = dmin delta2 = dmin elif codeword[0,i] == 1: delta2 = dmin Sum += delta1 delta1 *= 2 if delta1 > dmax: delta1 = dmax elif codeword[0,i] == -1: delta1 = dmin Sum -= delta2 delta2 *= 2 if delta2 > dmax: delta2 = dmax x[0,i] = Sum return x def adm(x, dmin=.01, dmax=.28, tol=0.0001): # Adaptive delta modulation adapted by code: # (adeltamod.m) # # Adaptive Delta Modulator # by Gandhar Desai (gdesai) # BITS Pilani Goa Campus # Date: 28 Sept, 2013 xsig = x Lx = len(x) ADMout = zeros((1, Lx)) codevec = zeros((1, Lx)) Sum = x[0] delta1 = dmin delta2 = dmin mult1 = 2 mult2 = 2 for i in range(0, Lx): #print abs(xsig[i] - Sum) if (abs(xsig[i] - Sum) < tol): bit = 0 delta2 = dmin delta1 = dmin elif (xsig[i] >= Sum): bit = 1 delta2 = dmin Sum += delta1 delta1 *= mult1 if delta1 > dmax: delta1 = dmax else: bit = -1 delta1 = dmin Sum -= delta2 delta2 *= mult2 if delta2 > dmax: delta2 = dmax ADMout[0, i] = Sum codevec[0, i]= bit return ADMout,codevec, x[0] def median_filter(v, K): v2 = zeros((len(v),)) for i in range(K, len(v)): seq = v[i-K:i] m = median(seq) v2[i-K:i] = m return v2 from models import ReverbModel if __name__=="__main__": if len(argv) != 2: print "[EE] Wrong number of arguments" print "[II] Correct syntax is:" print "[II] \t%s <trainingdir>" print "[II] where <trainingdir> is the directory containing the features files in the format *_features.yaml" print "[II] and the parameters in the format *_parameters.yaml" exit(-1) n_clusters = 3 UpsamplingFactor = 1 dmin = 0.001 dmax = 0.28 tol = 0.001 d1min = 0.01 d1max = 0.1 g1min = 0.01 g1max = 0.99 damin = 0.006 damax = 0.012 Gmin = 0.01 Gmax = 0.99 gcmin = 0.01 gcmax = 0.99 # For searching the directory from glob import glob traindir = argv[1] songs_in_dir = glob("%s/*_features.yaml" % traindir) print "[II] Using files: %s" % songs_in_dir chain_length=1 # asdsd # fullfeatures_pool = Pool() features_vector = None parameters_vector = None index_vector = None idx = 0 # Shuffle segments random.shuffle(songs_in_dir) for f_ in songs_in_dir: infile = f_ paramfile = "%s_parameters.yaml" % f_.split('_features.yaml')[0] print "[II] Using features file: %s" % infile print "[II] and parameters file: %s" % paramfile # paramfile = infile.split(') features_pool = YamlInput(filename = infile)() parameters_pool = YamlInput(filename = paramfile)() d1 = parameters_pool['parameters.d1'] da = parameters_pool['parameters.da'] g1 = parameters_pool['parameters.g1'] gc = parameters_pool['parameters.gc'] G = parameters_pool['parameters.G'] feature_captions = features_pool.descriptorNames() parameter_captions = ['d1','da','g1','gc','G'] svmhmmstr = "" for c in features_pool.descriptorNames(): if c.split('.')[0] == 'metadata' or c.split('.')[0] == 'parameter': feature_captions.remove(c) # close('all') # print "[II] Loaded training data from %s (%s) " % (infile, features_pool['metadata.date'][0]) print "[II] %d Features Available: " % len(feature_captions) print str(feature_captions).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] nfeatures_in = len(feature_captions) nparameters_in = 5 features_vector_one = zeros((nfeatures_in, len(features_pool[feature_captions[0]]))) index_vector_one = ones((len(features_pool[feature_captions[0]]),)) parameters_vector_one = ones((nparameters_in, len(features_pool[feature_captions[0]]))) # paramet for i in range(0, nfeatures_in): features_vector_one[i, :] = features_pool[feature_captions[i]].T # for i in range(0, nparameters_in): # parameters_vector[i, :] = features_pool[parameter_captions[0]].T parameters_vector_one[0, :] = d1*parameters_vector_one[0,:] parameters_vector_one[1, :] = g1*parameters_vector_one[1,:] parameters_vector_one[2, :] = da*parameters_vector_one[2,:] parameters_vector_one[3, :] = gc*parameters_vector_one[3,:] parameters_vector_one[4, :] = G*parameters_vector_one[4,:] # Stores example index number index_vector_one *= idx print "[II] %d parameters used:" % len(parameter_captions) print str(parameter_captions).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7].replace('parameter.','') if features_vector is None: features_vector = features_vector_one else: features_vector = append(features_vector, features_vector_one, 1) if parameters_vector is None: parameters_vector = parameters_vector_one else: parameters_vector = append(parameters_vector, parameters_vector_one, 1) if index_vector is None: index_vector = index_vector_one else: index_vector = append(index_vector, index_vector_one) idx += 1 print "[II] Marking silent parts" features_full_nontransformed_train = features_vector.copy() # silent_parts = zeros((1, len(features_pool[feature_captions[i]].T))) silent_parts = zeros((1, features_vector.shape[1])) rms = features_vector[feature_captions.index('rms'), :] # Implementing Hysteresis Gate -- High threshold is halfway between # the mean and the max and Low is halfway between the mean dn the min rms_threshold_mean = mean(rms) rms_threshold_max = max(rms) rms_threshold_min = min(rms) rms_threshold_high = 0.1 * rms_threshold_mean rms_threshold_low = 0.01 * rms_threshold_mean for n in range(1, len(rms)): prev = rms[n-1] curr = rms[n] if prev >= rms_threshold_high: if curr < rms_threshold_low: silent_parts[0,n] = 1 else: silent_parts[0,n] = 0 elif prev <= rms_threshold_low: if curr > rms_threshold_high: silent_parts[0,n] = 0 else: silent_parts[0,n] = 1 else: silent_parts[0,n] = silent_parts[0,n-1] if silent_parts[0,1] == 1: silent_parts[0, 0] = 1 active_index = invert(silent_parts.flatten().astype(bool)) # Keep only active parts # Uncomment this features_vector = features_vector[:, active_index] # Keep this for debugging purposes parameters_vector = parameters_vector[:, active_index] # index_vector = index_vector[active_index] # Label examples for a chain of chain_length # idx = 0 # for i in range(0, len(index_vector)): # index_vector[i] = idx # if i % chain_length == 0: # idx += 1 # number_of_examples = idx # Scale parameters to [0,1] parameters_vector[0,:] = (parameters_vector[0,:] - d1min)/d1max #d1 parameters_vector[1,:] = (parameters_vector[1,:] - g1min)/g1max #g1 parameters_vector[2,:] = (parameters_vector[2,:] - damin)/damax #g1 parameters_vector[3,:] = (parameters_vector[3,:] - Gmin)/Gmax #G parameters_vector[4,:] = (parameters_vector[4,:] - gcmin)/gcmax #gc moments_vector = zeros((features_vector.shape[0], 2)) features_vector_original = features_vector.copy() print "[II] Storing moments vector" for i in range(0, features_vector.shape[0]): mean_ = mean(features_vector[i,:]) std_ = std(features_vector[i,:], ddof=1) moments_vector[i,0] = mean_ moments_vector[i,1] = std_ features_vector[i,:] = (features_vector[i,:] - mean_)/std_ features_vector_old_train = features_vector.copy() # moments_vector_train = moments_vector print "[II] Extracting PCA configuration " kernel, q, featurelist = extract_pca_configuration_from_data(features_vector) #q = 1 # q = q + 1 print "[II] Optimal number of PCs to keep: %d" % q feature_captions_array = array(feature_captions) features_to_keep = list(feature_captions_array[featurelist]) print "[II] Decided to keep %d features:" % len(features_to_keep) print str(features_to_keep).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] featurelist_bak = featurelist features_kept_data = features_vector[featurelist,:] features_kept_data_bak = features_kept_data.copy() # features_vector_old_train = features_kept_data features_vector = (kernel.T*features_kept_data)[0:q,:] features_vector_new_train = features_vector.copy() # example_features = None # example_parameters = None # example_idx = None # # for idx in range(0, shape(parameters_vector)[1]-chain_length): # example_features_ = features_vector[:, idx:idx+chain_length] # # print example_features_ # example_parameters_ = parameters_vector[:, idx:idx+chain_length] # example_idx_ = ones((shape(example_parameters_)[1],)) # # if example_features is None: # example_features = example_features_ # else: # #print "appending" # example_features = append(example_features, example_features_, 1) # # if example_parameters is None: # example_parameters = example_parameters_ # else: # example_parameters = append(example_parameters, example_parameters_, 1) # # if example_idx is None: # example_idx = example_idx_*idx # else: # example_idx = append(example_idx, example_idx_*idx, 1) # # # # # features_vector = example_features # parameters_vector = example_parameters # index_vector = example_idx print "[II] Trying ADM-coded parameters" print "[II] Upsampling features and parameters by a factor of %d" % UpsamplingFactor # Upsampled features and parameters features_vector_upsampled = smooth_matrix_1D(repeat(features_vector,UpsamplingFactor, axis=1)) # features_vector_upsampled = repeat(features_vector, UpsamplingFactor, axis=1) # features_vector_upsampled = smooth_matrix_1D(repeat(features_vector_old_train,UpsamplingFactor, axis=1)) # feature_labels_upsampled = repeat(features_clustering_labels,UpsamplingFactor, axis=0) parameters_vector_upsampled = repeat(parameters_vector,UpsamplingFactor, axis=1) # km = KMeans(init='k-means++', n_init=10, max_iter=300, tol=0.0000001, verbose = 0, n_clusters=12).fit(parameters_vector_upsampled.T) # centers_ = km.cluster_centers_ # labels__ = km.labels_ # Remove the following two lines in order to restore non-kmeans version # parameters_vector_kmeans_upsampled = array(centers_[labels__]) # # parameters_vector_upsampled = parameters_vector_kmeans_upsampled.T # # # parameters_vector_upsampled = smooth_matrix_1D(parameters_vector_upsampled) parameters_vector_upsampled_adm = matrix(zeros(shape(parameters_vector_upsampled))) parameters_vector_upsampled_code = matrix(zeros(shape(parameters_vector_upsampled))) parameters_vector_upsampled_firstval = matrix(zeros((parameters_vector_upsampled.shape[0],1))) # Reconstructed parameters parameters_vector_upsampled_reconstructed = matrix(zeros(shape(parameters_vector_upsampled))) # ADM HERE # def adm_matrix(X, dmin=0.001,dmax=0.28,tol=0.001): out = matrix(zeros(shape(X))) code = matrix(zeros(shape(X))) firstval = matrix(zeros((X.shape[0], 1))) for i in range(0, X.shape[0]): out[i,:], code[i,:], firstval[i,0] = adm(X[i,:],dmin=dmin,dmax=dmax,tol=tol) return out,code,firstval # parameters_vector_upsampled_reconstructed[i,:] = adm_reconstruct(parameters_vector_upsampled_code[i,:],parameters_vector_upsampled_firstval[i,0], dmin=dmin,dmax=dmax) def adm_matrix_reconstruct(code, firstval, dmin=0.001, dmax=0.28): X = matrix(zeros(shape(code))) for i in range(0, code.shape[0]): X[i,:] = adm_reconstruct(code[i,:], firstval[i,0], dmin=dmin, dmax=dmax) return X parameters_vector_upsampled_adm, parameters_vector_upsampled_code, parameters_vector_upsampled_firstval = adm_matrix(parameters_vector_upsampled, dmin, dmax, tol) # parameters_vector_upsampled = parameters_vector_upsampled_code def diff_and_pad(X): return concatenate(( zeros(( shape(X)[0], 1 )), diff(X, axis=1)), axis=1) # Change for adm, # parameters_vector_upsampled = parameters_vector_upsampled_code print "[II] Clustering features." # features_clustering = GMM(n_components = n_clusters, covariance_type='diag') # features_clustering.fit( features_vector_upsampled.T, y=parameters_vector_upsampled_code) # features_clustering_means = features_clustering.means_ features_clustering_labels = features_clustering.predict(features_vector_upsampled.T) features_clustering_sigmas = features_clustering.covars_ # features_vector_upsampled_estimated = zeros(shape(features_vector_upsampled)) # # for n in range(0, len(features_vector_upsampled_estimated[0])): features_vector_upsampled_estimated[:,n] = features_clustering_means[features_clustering_labels[n]] # # print "[II] Features MSE for %d clusters: %.3f" % (n_clusters, mse(features_vector_upsampled, features_vector_upsampled_estimated)) def happy_curve_classification(data, classes, estimator, Nd=1): print "[II] Generating Happy Curve " from copy import deepcopy estimator_fulldata = deepcopy(estimator) estimator_fulldata.fit(data, classes) labels = estimator.predict(data) # 1. Split data in two, training and testing Ntr = int(round(data.shape[0]/2)) # Training dataset size Nts = data.shape[0] - Ntr # Testing dataset size ratios = [] for n in range(Nd, Ntr): train, test, trainl, testl = cross_validation.train_test_split(data, classes, test_size = 0.5, random_state = 0) train = train[0:n,:] trainl = trainl[0:n] # print "trainl" # print trainl estimator_ = deepcopy(estimator) estimator_.fit(train,trainl) labels = estimator_.predict(test) ratio = sum(array(testl==labels).astype(float))/len(labels) ratios.append(ratio) return ratios def cross_validate_classification(data, classes, estimator): print "[II] Crossvalidating... " from copy import deepcopy estimator_fulldata = deepcopy(estimator) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) MSEs = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: train,test,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=0) estimator_ = deepcopy(estimator) estimator_.fit(train, trainlabels) labels = estimator_.predict(test) print "[II] for training(%.2f)-testing(%.2f): %.2f" % ((1-p),p,sum(array(testlabels==labels).astype(float))/len(labels)) return MSEs def cross_validate_clustering(data, estimator): print "[II] Crossvalidating... " estimator_fulldata = estimator estimator_fulldata.fit(data) # labels = estimator_fulldata.predict(data) means = estimator_fulldata.means_ # print means percents = arange(0.1,0.6,0.1) MSEs = [] reconstructed = zeros(shape(data)) labels = estimator.predict(data) for n in range(0, len(reconstructed)): reconstructed[n,:] = means[labels[n]] MSEs.append(mse(data,reconstructed)) for p in percents: train,test = cross_validation.train_test_split(data,test_size=p,random_state=0) train = matrix(train) test = matrix(test) estimator.fit(train) means = estimator.means_ labels = estimator.predict(test) reconstructed = zeros(shape(test)) for n in range(0, len(reconstructed)): reconstructed[n,:] = means[labels[n]] m = mse(test,reconstructed) print "[II] MSE for clustering crossvalidated data %.2f-%.2f: %.5f" % ((1-p), p, m) MSEs.append(m) print "[II] Crossvalidation complete" return MSEs # Construct parameters alphabet, each symbol is going to be a different column vector # in parameter code matrix def vector_to_states(X): """ Input: a vector MxN with N samples and M variables Output: a codeword dictionary `parameters_alphabet', state_seq, inverse `parameters_alphabet_inv' """ parameters_alphabet = {} n = 0 for i in range(0, X.shape[1]): vec = tuple(ravel(X[:,i])) if vec not in parameters_alphabet: parameters_alphabet[vec] = n n += 1 parameters_alphabet_inv = dict([(parameters_alphabet[m],m) for m in parameters_alphabet]) state_seq = array([parameters_alphabet[tuple(ravel(X[:,m]))] for m in range(0, parameters_vector_upsampled_code.shape[1])] ) return state_seq, parameters_alphabet, parameters_alphabet_inv def states_to_vector(predicted, parameters_alphabet_inv): estimated = matrix(zeros((len(parameters_alphabet_inv[0]), len(predicted)))) for i in range(0, len(predicted)): # print "[II] Predicted: ", predicted[i] estimated[:, i] = matrix(parameters_alphabet_inv[predicted[i]]).T return estimated # state_seq, parameters_alphabet, parameters_alphabet_inv = vector_to_states(parameters_vector_upsampled_code) # parameters_change_variable = matrix(diff_and_pad(parameters_vector_upsampled)!=0).astype(int) # changes_state_seq, changes_parameters_alphabet, changes_parameters_alphabet_inv = vector_to_states(parameters_change_variable) # This is an hmm that just codes the changes" # We have only two states, change and stay the same. # Uncomment that here # parameters_vector_upsampled = parameters_vector_upsampled_code # parameters_state_orig, parameter_state_alphabet_orig, parameter_state_alphabet_inv_orig = vector_to_states(parameters_vector) # TODO: HA # parmaeters_vector_upsampled = parameters_vector_upsampled_code parameters_state, parameter_state_alphabet, parameter_state_alphabet_inv = vector_to_states(parameters_vector_upsampled) # Remove outliers ee = EllipticEnvelope() # inliers = ee.fit(features_vector_upsampled.T).predict(features_vector_upsampled.T) == 1 inoutliers = zeros((len(parameters_state),)) # for p in unique(parameters_state): ee = EllipticEnvelope(contamination=0.1) inoutliers[parameters_state == p] = ee.fit(features_vector_upsampled[:, parameters_state == p].T).predict(features_vector_upsampled[:, parameters_state == p].T) # # # Label inliers and outliers inliers = inoutliers > 0 outliers = inoutliers < 0 # # # # Do pre-processing and re-label the outliers #completeavac = SinkHoleClassifier() #completeavac.fit(features_vector_upsampled[:, inliers].T, parameters_state[inliers]) #parameters_state_original = parameters_state.copy() #parameters_state[outliers] = completeavac.predict(features_vector_upsampled[:,outliers].T) # features_vector_upsampled = features_vector_upsampled[:, inliers] # parameters_state = parameters_state[inliers] # print "UNIQUE PARAMETERS STATE" # print unique(parameters_state) # asdasdasd print "[II] Testing Gaussian Naive Bayes Classifier" gnb = GaussianNB() gnb.fit(features_vector_upsampled.T, parameters_state) parameters_state_estimated = gnb.predict(features_vector_upsampled.T) output = states_to_vector(parameters_state_estimated, parameter_state_alphabet_inv) figure() subplot(211) plot(parameters_vector_upsampled.T) title('Parameter value')# % UpsamplingFactor) ylabel('value') xlabel('frame \#') subplot(212) #plot(smooth_matrix_1D(output).T) plot(output.T) ylabel('value') xlabel('frame \#') errorvector_nbc = [mse(output[:,i], parameters_vector_upsampled[:,i]) for i in range(0, len(parameters_state_estimated))] #cross_validate_classification(features_vector_upsampled.T, parameters_state, gnb) # hc = happy_curve_classification(features_vector_upsampled.T, parameters_state, gnb) # # figure() # plot(hc) # figure() print "[II] Trying Multinomial HMM" # In order to do classification with HMMs, we need to: # 1. Split the parameters into classes # 2. Train one model per class # 3. Feed our data to all the models # 4. Check which has a better score and assig,n to M class SVMClassifier: def __init__(self,**kwargs): print "[II] SVM Classifier " # self.clf = svm.SVC(**kwargs) self.name = "SVM" self.smallname = "svm" self.clf = multiclass.OneVsOneClassifier(svm.SVC(**kwargs) ) def fit(self, X, classes): n_classes = max(unique(classes))+1 self.clf.fit(X,classes) def predict(self, X): return self.clf.predict(X) def getName(self): return self.name def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for SVN Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HmmClassifier2: def __init__(self, N=2, chain_size=4, n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d states, %d components)" % (N, n_components) self.n_components = n_components self.chain_size = chain_size self.hmms_ = [] self.N = N self.n_states = N def getName(self): return self.name def fit(self, features, parameters): self.n_params = len(unique(parameters)) for n in range(0, self.n_params): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type='full') # Get training data for each parameter for i in range(0, len(parameters)-self.chain_size): seq = features[i:i+self.chain_size, :] if parameters[i+self.chain_size-1] == n and seq.shape[0] == self.chain_size: # print "[II] Fitting: %s" % str(seq) hmm_.fit([seq]) self.hmms_.append(hmm_) def predict(self, X): labels = zeros((X.shape[0],)) N = self.N scores = zeros((self.n_states, )) for i in range(0, len(labels)): testdata = X[i:i+self.chain_size,:] for j in range(0, self.n_states): scores[j] = self.hmms_[j].score(testdata) labels[i] = argmax(scores) return labels def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for HMM2 Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HmmClassifier: def __init__(self, N=2, n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d states, %d components)" % (N, n_components) self.n_components = n_components self.chain_size = N self.hmms_ = [] self.N = N def getName(self): return self.name def fit(self, X, states): self.n_states = len(unique(states)) for n in range(0, self.n_states): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type = 'full') # Get training data for each class vector = X[states == n,:] # Fit the HMM # print vector hmm_.fit([vector]) # And append to the list self.hmms_.append(hmm_) def predict(self,X): labels = zeros((X.shape[0],)) N = self.N m = 0 scores = zeros((1, self.n_states)) while m*N < X.shape[0]: if (m+1)*N > X.shape[0]: testdata = X[m*N:,:] else: testdata = X[m*N:(m+1)*N,:] # print testdata for i in range(0, self.n_states): scores[0,i] = self.hmms_[i].score(testdata) if (m+1)*N > X.shape[0]: labels[m*N:] = argmax(scores) else: labels[m*N:(m+1)*N] = argmax(scores) m+= 1 return labels # def cross_validate(self, data, classes, index_vector): # print "[II] Crossvalidating... " # from copy import deepcopy # estimator = deepcopy(self) # estimator_fulldata = deepcopy(self) # estimator_fulldata.fit(data, classes) # # percents = arange(0.1,0.9,0.1) # self.percents = percents * 100 # RATIOS = [] # labels = estimator.predict(data) # # print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) # # for p in percents: # train,test,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # estimator_ = deepcopy(estimator) # estimator_.fit(train, trainlabels) # labels = estimator_.predict(test) # m = sum(array(testlabels==labels).astype(float))/len(labels) # RATIOS.append(m) # print "[II] for training(%.2f)-testing(%.2f): %.2f" % ((1-p),p,sum(array(testlabels==labels).astype(float))/len(labels)) # # return RATIOS, percents def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) # testdata = None # # traindata = None # trainlabels = None # testlabels = None # # for t in train: # if traindata is None: # traindata = data[index_vector == t, :] # trainlabels = classes[index_vector == t] # else: # traindata = append(traindata, data[index_vector == t, :], 0) # trainlabels = append(trainlabels, classes[index_vector ==t]) # # estimator_ = deepcopy(estimator) # estimator_.fit(traindata, trainlabels) # # # for t in test: # if testdata is None: # testdata = data[index_vector == t, :] # testlabels = classes[index_vector == t] # else: # testdata = append(testdata, data[index_vector == t,:], 0) # testlabels = append(testlabels, classes[index_vector == t]) # predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for HMM Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents def cross_validate_full_k_fold(data, classes, list_of_classifiers, K=10): """ K-Folds Cross-validates all available classifiers, plots and saves barplots. """ ratios = [] for l in list_of_classifiers: ratio = l.k_fold_cross_validate(data, classes, K) ratios.append(ratio) L = len(ratios) ind = arange(L) fig, ax = subplots() rects = [] width = 10/float(len(ratios)) colors = ['r', 'y', 'g', 'c'] labels_ = [] for i in range(0, len(ratios)): rects.append(ax.bar(i*width,ratios[i],width,color=colors[i % len(colors)])) labels_.append(list_of_classifiers[i].getName()) return ratios def cross_validate_full(seq, classes, list_of_classifiers): """ Cross-validates all available classifiers, plots and saves barplots. """ ratios = [] percents = [] for l in list_of_classifiers: ratios_, percents_ = l.cross_validate(seq, classes) ratios.append(ratios_) percents.append(percents_) L = len(ratios[0]) ind = arange(L) fig,ax = subplots() rects = [] W = 0.8 width = W/float(len(ratios)) colors = ['r', 'y', 'g', 'c'] labels_ = [] for i in range(0, len(ratios)): rects.append(ax.bar(ind+i*width,ratios[i],width,color=colors[i % len(colors)])) labels_.append(list_of_classifiers[i].getName()) ax.legend(tuple(labels_)) ax.set_xticks(ind+W/2) ax.set_xticklabels(tuple(percents[0]*100)) ax.set_xlabel("Percentage % of test data") ax.set_ylabel("Success ratio") # rects1 = ax.bar(ind,ratios[0],0.35,color='r') # rects2 = ax.bar(ind+0.351,ratios[1],0.35,color='y') return ratios # hmmc = HmmClassifier(N = len(unique(parameters_state)), n_components = 1) # hmmc.fit(features_vector_upsampled.T, parameters_state) # # hmmc2 = HmmClassifier(N = 12, n_components = 4) # hmmc2.fit(features_vector_upsampled.T, parameters_state) # svmc = SVMClassifier(probability=True) svmc.fit(features_vector_upsampled.T, parameters_state) # # svmc.cross_validate(features_vector_upsampled.T, parameters_state, index_vector) # nbc = NBClassifier() nbc.fit(features_vector_upsampled.T, parameters_state) # # #hchmmc = happy_curve_classification(features_vector_upsampled.T, parameters_state, hmmc, Nd=100) # # cross_validate_classification(features_vector_upsampled.T, parameters_state, hmmc) # # #hmmc.cross_validate(features_vector_upsampled.T, parameters_state) # hmmc3 = HmmClassifier3(N=2, n_components = max(unique(parameters_state))) #print(parameters_state) plot(parameters_state) print(shape(features_vector_upsampled)) obs = hmmc3.fit(features_vector_upsampled.T, parameters_state) # svmhmm = HmmSVMClassifier() # svmhmm.fit(features_vector_upsampled.T, parameters_state) # ratios = cross_validate_full(features_vector_upsampled.T, parameters_state, [nbc, svmc]) # ratios = cross_validate_full_k_fold(features_vector_upsampled.T, parameters_state, # [hmmc3]) def approximate_chain_size(data,parameters): ratios = [] # chainsizes = arange(1, 40) # for cs in chainsizes: # print "[II] Trying chain size: %d" % cs # hmmc = HmmClassifier3(N=cs, n_components = 2) # ratios.append(hmmc.k_fold_cross_validate(data,parameters,K=10)) componentssize = arange(1, 12) for cs in componentssize: print "[II] Trying component size: %d" % cs hmmc = HmmClassifier3(N=2, n_components = cs) ratios.append(hmmc.k_fold_cross_validate(data, parameters, K=2)) figure() plot(componentssize, ratios) xlabel('Chain Size') ylabel('Success Ratio') title('10-Fold cross validation success ratio vs chain size') return ratios # ratios = approximate_chain_size(features_vector_upsampled.T, parameters_state) #predictedhmmc2 = hmmc2.predict(features_vector_upsampled.T) # figure() # hmms_ = [] # # for n in range(0, len(parameter_state_alphabet)): # #hmm_ = hmm.GMMHMM(n_components = 1, n_mix = 2) # hmm_ = hmm.GaussianHMM(n_components = 1,covariance_type = 'full') # # # Get training data for each class # vector = features_vector_upsampled[:,parameters_state == n] # # #if vector.shape[1] < n_clusters: # # hmms_.append(None) # #else: # # hmm_.fit([vector.T]) # # # Append to the list # # hmms_.append(hmm_) # # labels = zeros((features_vector_upsampled.shape[1],)) # # N = 20 # m = 0 # # while m*N < features_vector_upsampled.shape[1]: # # scores = zeros((1, len(parameter_state_alphabet))) # # if (m+1)*N > features_vector_upsampled.shape[1]: # testdata = features_vector_upsampled[:,m*N:] # else: # testdata = features_vector_upsampled[:,m*N:(m+1)*N] # # for i in range(0, len(parameter_state_alphabet)): # if hmms_[i] is not None: # scores[0,i] = hmms_[i].score(testdata.T) # else: # scores[0,i] = -100000 # Very large negative score # if (m+1)*N >= features_vector_upsampled.shape[1]: # labels[m*N:] = argmax(scores) # else: # labels[m*N:(m+1)*N] = argmax(scores) # # m += 1 # figure() #plot(labels.T) # labels = hmmc.predict(features_vector_upsampled.T) # estimated = states_to_vector(labels,parameter_state_alphabet_inv) # plot(estimated.T,'r--') # # title('Estimated parameter values') # legend(['Naive Bayes Classifier', 'HMM chain size %d (%.1fms)' % (N, float(N)/UpsamplingFactor*23.0)]) # # ylabel('value') # xlabel('frame #') # # close('all') # plot(features_clustering_labels/float(max(features_clustering_labels))) # plot(parameters_vector_upsampled.T/max(ravel(parameters_vector_upsampled))) def plot_clusters(x, labels): figure() symbols = ['>', 'x', '.', '<','v'] colors = ['b', 'r', 'g', 'm','c'] for r in range(0, x.shape[0]): scatter(x[r,0], x[r,1], c=colors[int(labels[r]) % len(colors)], marker=symbols[int(labels[r]) % len(symbols)]) # plot_clusters(features_vector_upsampled.T, parameters_state) # # SVM class HmmClassifierSVN_Naive: def __init__(self, N=2, n_components = 1): self.n_components = n_components self.chain_size = N self.hmms_ = [] self.N = N def fit(self, X, states): self.n_states = len(unique(states)) for n in range(0, self.n_states): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type = 'full') # Get training data for each class vector = X[states == n,:] # Fit the HMM # print vector hmm_.fit([vector]) # And append to the list self.hmms_.append(hmm_) def predict(self,X): labels = zeros((X.shape[0],)) N = self.N m = 0 scores = zeros((1, self.n_states)) while m*N < X.shape[0]: if (m+1)*N > X.shape[0]: testdata = X[m*N:,:] else: testdata = X[m*N:(m+1)*N,:] # print testdata for i in range(0, self.n_states): scores[0,i] = self.hmms_[i].score(testdata) if (m+1)*N > X.shape[0]: labels[m*N:] = argmax(scores) else: labels[m*N:(m+1)*N] = argmax(scores) m+= 1 return labels def plot_parameters_estimation(list_of_estimators): for e in list_of_estimators: name_ = e.getName() fig = figure() fig.suptitle('Parameters estimation using %s' % name_, fontweight='bold') subplot(311) title ('original parameters') param_real = states_to_vector(parameters_state,parameter_state_alphabet_inv).T plot(param_real) xlabel('sample') ylabel('normalized parameter value') legend(parameter_captions) subplot(312) title('estimated parameters') pred = e.predict(features_vector_upsampled.T) param_est = states_to_vector(pred,parameter_state_alphabet_inv).T plot(param_est) xlabel('sample') ylabel('normalized parameter value') legend(parameter_captions) subplot(313) error_real_est = zeros((len(param_est),)) for i in range(0, len(error_real_est)): error_real_est[i] = mse(param_real[i],param_est[i]) totalmse = sum(error_real_est) title('mean squared error (total: %.3f)' % totalmse) plot(error_real_est) xlabel('sample') ylabel('normalized parameter value') # hmm2 = hmm.MultinomialHMM(n_components = n_classes, startprob = prior_p_q, transmat = transmat ) # TODO: Embed SVM emissions to HMM. Try to do pick maximum likelihood sequence from # classifiers like in HmmClassifier3. For every class, train an SVM, with n_components as # states, and compute the log-likelihood for prediction. Should work. # class HMMsvmClassifier: # def __init__(self, N=6): # print "[II] HMM-SVM Classifier (%d time steps)" % N # self.name = "HMM-SVM (%d time steps)" % N # # self.n_components = n_components # self.chain_size = N # self.svm = MyAVAClassifier() # # # def getName(self): # return self.name # # def fit(self, features, parameters): # # # First train emission svm # # self.svm.fit(features, parameters) # # # print parameters # n_classes = max(unique(parameters)) + 1 # # print "[II] Number of classes: ", n_classes # hmms = [None]*n_classes # chain_size = self.chain_size # params = [None]*n_classes # obs = [None]*n_classes # # for i in range(chain_size, len(parameters)): # class_ = parameters[i-1] # params_ = parameters[i-chain_size:i] # feats_ =features[i-chain_size:i,:] # # if params[class_] is None: # params[class_] = [params_] # obs[class_] = [feats_] # else: # params[class_].append(params_) # obs[class_].append(feats_) # # # # for i in range(0, len(params)): # if params[i] is not None and len(params[i]) != 0: # hmm_ = HMMsvm(self.svm) # # print "[II] %d Fitting params: " % i, params[i] # print "[!!] OBSERVATIONS: " # print obs[i] # hmm_.fit(obs[i], params[i]) # # print "[!!] PARAMETERS: " # print params[i] # # # TODO: Left here on 06/07 19:56 # # # hmm_.fit(features,) # # print "obs: ", obs[i] # # print "params: ", params[i] # # hmm_.fit(obs[i], params[i],flr=1e-9) # hmms[i] = hmm_ # # self.hmms = hmms # # return obs # def predict(self, features, mfilt=20): # chain_size = self.chain_size # hmms = self.hmms # predicted_classes = zeros((features.shape[0],)) # for i in range(chain_size, features.shape[0]): # scores = zeros((len(hmms),)) # # seq = features[i-chain_size:i, :] # # for j in range(0, len(hmms)): # if hmms[j] is not None: # q,p = hmms[j].logviterbi(seq) # # scores[j] = logsumexp(p) # else: # scores[j] = -infty # # predicted_classes[i] = argmax(scores) # # # Do a median filter at the end # # # predicted_classes = median_filter(predicted_classes,mfilt) # # return predicted_classes class HmmSvmClassifier3: def __init__(self, N=2,n_components = 1): print "[II] HMM/SVM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d time steps, %d components)" % (N, n_components) self.n_components = n_components self.smallname = "hmmsvmc" self.chain_size = N def getName(self): return self.name def fit(self, features, parameters): # print parameters n_classes = max(unique(parameters)) + 1 if n_classes == 1: self.only_one_class = True return else: self.only_one_class = False print "[II] Number of classes: ", n_classes hmms = [None]*n_classes # svms = [None]*n_classes chain_size = self.chain_size obs = [None]*n_classes cls = [None]*n_classes for i in range(chain_size, len(parameters)): class_ = parameters[i-1] seq = features[i-chain_size:i,:] params = parameters[i-chain_size:i] if obs[class_] is None: obs[class_] = [seq] cls[class_] = [params] else: obs[class_].append(seq) cls[class_].append(params) for i in range(0, len(obs)): if obs[i] is not None and len(obs[i]) != 0: # hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='full') svm_ = MyAVAClassifier() print obs[i] print cls[i] O = obs[i][0] C = cls[i][0] for j in range(0, len(obs[i])): O = append(O, obs[i][j], 1) C = append(C, cls[i][j], 1) print O print C svm_.fit(O.T,C) hmm_ = HMMsvm(svm_) hmm_.fit(obs[i],cls[i]) hmms[i] = hmm_ self.hmms = hmms return obs def predict(self, features, mfilt=20): if self.only_one_class == True: return zeros((features.shape[0], )) chain_size = self.chain_size hmms = self.hmms predicted_classes = zeros((features.shape[0],)) for i in range(chain_size, features.shape[0]): scores = zeros((len(hmms),)) seq = features[i-chain_size:i, :] for j in range(0, len(hmms)): if hmms[j] is not None: print hmms[j].score(seq) scores[j] = hmms[j].score(seq)[j] else: scores[j] = -infty predicted_classes[i] = argmax(scores) # Do a median filter at the end # predicted_classes = median_filter(predicted_classes,mfilt) return predicted_classes # newstates = self.logviterbi() # # # # Alphas and Betas # X = features # alphas = zeros((T,n_classes)) # betas = zeros((T,n_classes)) # # forward_messages = zeros((T,n_classes)) # backward_messages = zeros((T, n_classes)) # # print "[II] Computing alpha, beta..." # for t in range(1, T+1): # forward_messages[t-1,:] = self._forward_message(forward_messages[t-2,:],X[0:t,]) # backward_messages[-t,:] = self._backward_message(backward_messages[-t+1], X[-t:,:]) # # # print "[II] Computing ksu..." # # a_norm = forward_messages[-1,n_classes-1] # alpha_T # # ksu = zeros((T, n_classes, n_classes)) # # for i in range(0, n_classes): # for j in range(0, n_classes): # for t in range(0, T-1): # ksu[t,i,j] = exp(forward_messages[t, i] + log(transmat[i,j]) + log(self.estimate_emission_probability(X[t,:],j)) + backward_messages[t+1,j] - a_norm) # # # # self.alphas = forward_messages # self.betas = backward_messages # self.ksu = ksu # # transmat_new = zeros((n_classes,n_classes)) # # for i in range(0, n_classes): # # for j in range(0, n_classes): # transmat_new[i,j] = sum(ksu[:,i,j])/sum(ksu[:,i,:]) # # self.transmat_new = transmat_new # # # def forward(self, X): # # Compute log likelihood using the forward algorithm # # # Number of states # N = self.n_classes # # # Number of Observations # T = X.shape[0] # # # transmat = self.transmat # # # Initialization # # a1 = ones((N,))/N # a1 = self.prior # # alpha = zeros((N,)) # for i in range(0, N): # alpha[i] = log(a1[i])+log(self.estimate_emission_probability(X[0,:], i)) # # print alpha # # Recursion # for t in range(0, T): # alpha_old = alpha.copy() # x = X[t, :] # for j in range(0, N): # tosum = zeros((N,)) # for i in range(0, N): # tosum[i] = alpha_old[i]+log(transmat[i,j]) # # # Sum = logsum(tosum) # Sum = logsumexp(tosum) # bj = self.estimate_emission_probability(x, j) # # alpha[j] = Sum+log(bj) # # print alpha # # tosum = zeros((N,)) # for i in range(0, N): # tosum[i] = alpha[i] + log(transmat[i,N-1]) # # return tosum # # def backward(self, X): # # Number of states # N = self.n_classes # # # Number of Observations # T = X.shape[0] # # transmat = self.transmat # # # Initialization # # b1 = ones((N,))/N # # beta = zeros((N,)) # for i in range(0, N): # beta[i] = log(b1[i])+log(self.estimate_emission_probability(X[-1,:], i)) # # for t in range(0, T): # beta_old = beta.copy() # x = X[-t, :] # for j in range(0, N): # tosum = zeros((N, )) # for i in range(0, N): # tosum[i] = beta_old[i]+log(transmat[i,j]) # # Sum = logsumexp(tosum) # bj = self.estimate_emission_probability(x, j) # beta[j] = Sum+log(bj) # # tosum = zeros((N,)) # # for i in range(0, N): # tosum[i] = beta[i] + log(transmat[i,0]) # # #3 p = logsumexp(tosum) # # return tosum def _log_likelihood(self, X): return logsumexp(self.forward(X)) def _likelihood(self, X): # Number of states N = self.n_classes # Number of Observations T = X.shape[0] transmat = self.transmat # Initialization # a1 = ones((N,))/N a1 = self.prior alpha = zeros((N,)) for i in range(0, N): alpha[i] = a1[i]*self.estimate_emission_probability(X[0,:], i) # Recursion print alpha for t in range(1, T): alpha_old = alpha.copy() x = X[t, :] for j in range(0, N): Sum = 0 for i in range(0, N): Sum += alpha_old[i]*transmat[i,j] alpha[j] = Sum*self.estimate_emission_probability(x, j) print alpha # Termination Sum = 0 for i in range(0, N): Sum += alpha[i]*transmat[i, N-1] p = Sum return p # def fit(self, X, states): # # print parameters # n_classes = max(unique(states)) + 1 # # # svms = [None]*n_classes # obs = [None]*n_classes # sta = [None]*n_classes # chain_size = self.chain_size # # #22 obs = None # # sta = None # # print "[II] Number of classes: ", n_classes # # # For each class: # # Concatenate examples # # Fit SVM # # for i in range(chain_size, len(states)): # class_ = states[i-1] # seq = X[i-chain_size:i, :] # states_ = states[i-chain_size:i] # # # if obs[class_] is None: # obs[class_] = [seq] # sta[class_] = [states_] # self.svms.append(MyAVAClassifier()) # else: # obs[class_].append(seq) # sta[class_].append(states_) # # # for i in range(0, len(obs)): # # o = None # s = None # # for j in range(0, len(obs[i])): # if o is None: # o = obs[i][j] # s = sta[i][j] # # else: # o = append(o, obs[i][j],0) # s = append(s, sta[i][j]) # # # self.svms[i].fit(o, s) # def predict(self, features): # chain_size = self.chain_size # svms = self.svms # # predicted_classes = zeros((features.shape[0],)) # for i in range(chain_size, features.shape[0]): # scores = zeros((len(svms))) # # seq = features[i-chain_size:i, :] # # for j in range(0, len(svms)): # if svms[j] is not None: # scores[j] = svms[j].compute_log_likelihood(seq) # else: # scores[k] = -infty # predicted_classes[i] = argmax(scores) # # return predicted_classes # Marginalize over the latent variable # for i in range(0, X.shape[0]): # P = zeros((self.n_classes,)) # x = X[i,:] # for j in range(0, len(P)): # P[j] = self.estimate_emission_probability(j, x) # # S[i] = log(sum(P[j])) # # return sum(S) # Continue from here X = features_vector_upsampled.T y = parameters_state # clf = svm.SVC() # clf.fit(X,y) # parameters_state_y = clf.predict(X) predhmmc3 = hmmc3.predict(features_vector_upsampled.T) myava = MyAVAClassifier() myava.fit(features_vector_upsampled.T, parameters_state) predmyava = myava.predict(features_vector_upsampled.T) # hmmsvmc = HMMsvmClassifier(N=2) # hmmsvmc.fit(features_vector_upsampled.T, parameters_state) #predhmmsvmc = hmmsvmc.predict(features_vector_upsampled.T) obsc = MyAVAClassifier() obsc.fit(features_vector_upsampled.T, parameters_state) # hmm2 = hmm.GaussianHMM(n_components=6) # hmm2.fit([features_vector_upsampled.T]) # hmm3 = HMMsvm(obsc) # hmm3.fit([features_vector_upsampled.T],[parameters_state]) # predhmmsvm = hmm3.predict(features_vector_upsampled.T) # hmmsvmc = HMMsvmClassifier() hmmsvmc = HMMsvmClassifier() # hmmsvmc = HmmSvmClassifier3() hmmsvmc.fit(features_vector_upsampled.T, parameters_state) predhmmsvm = hmmsvmc.predict(features_vector_upsampled.T) plot_parameters_estimation([nbc, myava, hmmc3, hmmsvmc]) #plot(parameters) figure() plot(parameters_state,'b') # plot(parameters_state_y,'r--') plot(predhmmc3,'g+-') plot(predhmmsvm, 'co-') # plot(predhmmsvmc, 'bo-') legend(['Real', 'SVM', 'HMM','HMM-SVM']) xlabel('sample') ylabel('state') # TODO, HMM with SVN, Cross-validation # Using hidden markov svm svmhmm = "" # print "[II] Success ratio for SVM-RBF Classifier: %.2f" % (sum(parameters_state==parameters_state_y).astype(float)/float(len(parameters_state))) print "[II] All-vs-All custom Support Vector Classifier Success Ratio: %.2f " %(sum(predmyava==parameters_state).astype(float)/len(parameters_state)) print "[II] Success ratio for HMM: %.2f" % (sum(parameters_state==predhmmc3).astype(float)/float(len(predhmmc3))) print "[II] Success ratio for HMM-SVM: %.2f" % (sum(parameters_state==predhmmsvm).astype(float)/float(len(predhmmsvm))) model_ghmm = ReverbModel("GaussianHMM Classifier Model", kernel, q, features_to_keep, hmmc3, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector) model_gnb = ReverbModel("Naive Bayes Classifier Model", kernel, q, features_to_keep, nbc, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector ) model_svm = ReverbModel("AVA LinearSVM Classifier Model", kernel, q, features_to_keep, myava, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector ) model_hmmsvm = ReverbModel("HMM0SVM Classifier Model", kernel, q, features_to_keep, hmmsvmc, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector ) model_ghmm.save("model_ghmm.dat") model_gnb.save("model_gnb.dat") model_svm.save("model_svm.dat") model_hmmsvm.save("model_hmmsvm.dat") def split_data_to_k_samples(data, params, K): L = shape(data)[0] M = int(round(L/float(K))) samples_data = [] samples_parameters = [] for k in range(K): if (k+1)*M > L: samples_data.append(data[k*M:,:]) samples_parameters.append(params[k*M:]) else: samples_data.append(data[k*M:(k+1)*M,:]) samples_parameters.append(params[k*M:(k+1)*M]) return samples_data, samples_parameters def join_data(data, params): L = 0 for p in params: L += len(p) new_data = zeros((L, data[0].shape[1])) new_params = zeros((L,)) idx = 0 for n in range(0,len(data)): new_data[idx:idx+data[n].shape[0],:] = data[n] new_params[idx:idx+len(params[n])] = params[n] idx += len(params[n]) return new_data, new_params cross_validation_ratios = {} cross_validation_mses = {} features_samples, parameters_samples = split_data_to_k_samples(features_vector_upsampled.T, parameters_state, 5) from sklearn.cross_validation import KFold from sklearn import metrics KF = 10 s1,s2 = split_data_to_k_samples(features_vector_upsampled.T, parameters_state, KF) r1,r2 = join_data(s1,s2) kf = KFold(KF, n_folds=KF) # Leave one out trial = 1.0 rhmmc = 0 rgnbc = 0 ravac = 0 rhmmsvmc = 0 msegbnc = 0 msehmmc = 0 mseavac = 0 msehmmsvmc = 0 fullratios = [] fullmses = [] def calculate_mse(pred, real): realparams = states_to_vector(real, parameter_state_alphabet_inv) estparams = states_to_vector(pred, parameter_state_alphabet_inv) return mse(realparams, estparams) ratios_gnb = [] ratios_hmm = [] ratios_svm = [] ratios_hmmsvm = [] ratios_sinkhole = [] mse_gnb = [] mse_hmm = [] mse_svm = [] mse_hmmsvm = [] mse_sinkhole = [] for train, test in kf: train_idx = 100 print train,test f1 = [] p1 = [] tf1 = [] tp1 = [] for i in train: f1.append(s1[i-1]) p1.append(s2[i-1]) for i in test: tf1.append(s1[i-1]) tp1.append(s2[i-1]) new_trainset_data, new_trainset_params = join_data(f1, p1) new_trainset_params = new_trainset_params.astype(int) # Detect and remove outliers # ee = EllipticEnvelope() # ee.fit(new_trainset_data, new_trainset_params) # inliers = ee.predict(new_trainset_data) == 1 # new_trainset_data = new_trainset_data[inliers, :] # new_trainset_params = new_trainset_params[inliers] new_testset_data, new_testset_params = join_data(tf1, tp1) new_testset_params = new_testset_params.astype(int) param_est = states_to_vector(new_testset_params,parameter_state_alphabet_inv).T X = arange(50, len(new_trainset_params), 50) print X # X = append(X, [len(new_trainset_params)], 1) # for M in X: # print CHAINSIZE # print NCOMPONENTS hmmc = HmmClassifier3(N=CHAINSIZE, n_components = NCOMPONENTS) gnbc = GaussianNB() avac = MyAVAClassifier() hmmsvmc = HMMsvmClassifier() gnbc.fit(new_trainset_data[0:M,:], new_trainset_params[0:M]) pgnbc = gnbc.predict(new_testset_data) rgnbc = metrics.accuracy_score(pgnbc, new_testset_params) mgnbc = calculate_mse(pgnbc,new_testset_params) # hmmc.fit(new_trainset_data[0:M,:], new_trainset_params[0:M]) phmmc = hmmc.predict(new_testset_data) rhmmc = metrics.accuracy_score(phmmc, new_testset_params) mhmmc = calculate_mse(phmmc,new_testset_params) # try: avac.fit(new_trainset_data[0:M,:], new_trainset_params[0:M]) pavac = avac.predict(new_testset_data) ravac = metrics.accuracy_score(pavac, new_testset_params) mavac = calculate_mse(pavac,new_testset_params) except: ravac = 0 mavac = inf # # # # # try: hmmsvmc.fit(new_trainset_data[0:M,:], new_trainset_params[0:M]) phmmsvm = hmmsvmc.predict(new_testset_data) rhmmsvm = metrics.accuracy_score(phmmsvm, new_testset_params) mhmmsvm = calculate_mse(phmmsvm,new_testset_params) # except: rhmmsvm = 0 mavac = inf # # ratios_gnb.append(rgnbc) # ratios_hmm.append(rhmmc) # ratios_svm.append(ravac) # ratios_hmmsvm.append(rhmmsvm) # # mse_gnb.append(mgnbc) # mse_hmm.append(mhmmc) # mse_svm.append(mavac) # mse_hmmsvm.append(mhmmsvm) # # fullratios.append((trial,X,ratios_gnb,ratios_hmm,ratios_svm,ratios_hmmsvm)) # fullmses.append((trial,X,mse_gnb,mse_hmm,mse_svm,mse_hmmsvm)) # # # # # hmmc = HmmClassifier3(N=3, n_components = max(unique(parameters_state))+1) hmmc = HmmClassifier3(N=CHAINSIZE, n_components = NCOMPONENTS) gnbc = GaussianNB() avac = MyAVAClassifier() hmmsvmc = HMMsvmClassifier() sinkholec = SinkHoleClassifier() hmmc.fit(new_trainset_data, new_trainset_params) gnbc.fit(new_trainset_data, new_trainset_params) # try: avac.fit(new_trainset_data, new_trainset_params) sinkholec.fit(new_trainset_data, new_trainset_params) pavac = avac.predict(new_testset_data) # except: # pavac = ones((new_testset_data.shape[0],))*new_trainset_params[0] # try: hmmsvmc.fit(new_trainset_data, new_trainset_params) phmmsvm = hmmsvmc.predict(new_testset_data) # except: # # # phmmsvm = hmmsvmc.predict(new_testset_data) # phmmsvm = ones((new_testset_data.shape[0],))*new_trainset_params[0] phmmc = hmmc.predict(new_testset_data) pgnbc = gnbc.predict(new_testset_data) psinkhole = sinkholec.predict(new_testset_data) print "[II] (Trial %d) Classification ratio for GNB: %.2f" % (trial, metrics.accuracy_score(pgnbc, new_testset_params)) print "[II] (Trial %d) Classification ratio for SVM: %.2f" % (trial, metrics.accuracy_score(pavac, new_testset_params)) print "[II] (Trial %d) Classification ratio for HMM: %.2f" % (trial, metrics.accuracy_score(phmmc, new_testset_params)) print "[II] (Trial %d) Classification ratio for HMM/SVM: %.2f" % (trial, metrics.accuracy_score(phmmsvm, new_testset_params)) print "[II] (Trial %d) Classification ratio for Sinkhole Approach: %.2f" % (trial, metrics.accuracy_score(psinkhole, new_testset_params)) mse # rgnbc = rhmmc*(trial-1)/trial + metrics.accuracy_score(pgnbc, new_testset_params)/trial # ravac = rhmmc*(trial-1)/trial + metrics.accuracy_score(pavac, new_testset_params)/trial # rhmmc = rhmmc*(trial-1)/trial + metrics.accuracy_score(phmmc, new_testset_params)/trial # rhmmsvmc = rhmmc*(trial-1)/trial + metrics.accuracy_score(phmmsvm, new_testset_params)/trial rgnbc = metrics.accuracy_score(pgnbc, new_testset_params) ravac = metrics.accuracy_score(pavac, new_testset_params) rhmmc = metrics.accuracy_score(phmmc, new_testset_params) rhmmsvmc = metrics.accuracy_score(phmmsvm, new_testset_params) rsinkhole = metrics.accuracy_score(psinkhole, new_testset_params) ratios_gnb.append(rgnbc) ratios_svm.append(ravac) ratios_hmm.append(rhmmc) ratios_hmmsvm.append(rhmmsvmc) ratios_sinkhole.append(rsinkhole) # msegbnc = msegbnc*(trial-1)/trial + calculate_mse(pgnbc,new_testset_params)/trial # msehmmc = msehmmc*(trial-1)/trial + calculate_mse(phmmc,new_testset_params)/trial # mseavac = mseavac*(trial-1)/trial + calculate_mse(pavac,new_testset_params)/trial # msehmmsvmc = msehmmsvmc*(trial-1)/trial + calculate_mse(phmmsvm,new_testset_params)/trial msegnb = calculate_mse(pgnbc,new_testset_params) msesvm = calculate_mse(pavac,new_testset_params) msehmm = calculate_mse(phmmc,new_testset_params) msehmmsvm = calculate_mse(phmmsvm,new_testset_params) msesinkhole = calculate_mse(psinkhole,new_testset_params) mse_gnb.append(msegnb) mse_svm.append(msesvm) mse_hmm.append(msehmm) mse_hmmsvm.append(msehmmsvm) mse_sinkhole.append(msesinkhole) trial += 1 print ratios_gnb sucrats = [mean(ratios_gnb),mean(ratios_svm), mean(ratios_hmm),mean(ratios_hmmsvm), mean(ratios_sinkhole)] sucratsstd = [std(ratios_gnb,ddof=1),std(ratios_svm,ddof=1), std(ratios_hmm,ddof=1),std(ratios_hmmsvm,ddof=1), std(ratios_sinkhole,ddof=1)] mserats = [mean(mse_gnb),mean(mse_svm), mean(mse_hmm),mean(mse_hmmsvm), mean(mse_sinkhole)] #mserats = [msegbnc, mseavac, msehmmc,msehmmsvmc] print tuple(sucrats) # print tuple(sucratsstd) # scores = zeros((len(sucrats), len(sucrats))) # for i in range(0, len(sucrats)): # for j in range(0, len(sucrats)): # scores[i,j] = (sucrats[i] - sucrats[j])/sqrt(sucratsstd[i]**2 + sucratsstd[j]**2) print tuple(mserats) # print scores # print (msegbnc, mseavac, msehmmc,msehmmsvmc ) sample_sizes = X/float(max(X)) gnbc_ratios_curves = zeros((len(X),len(fullratios))) gnbc_mses_curves = zeros((len(X),len(fullratios))) # avac_ratios_curves = zeros((len(X),len(fullratios))) avac_mses_curves = zeros((len(X),len(fullratios))) # hmmc_ratios_curves = zeros((len(X),len(fullratios))) hmmc_mses_curves = zeros((len(X),len(fullratios))) # hmmsvmc_ratios_curves = zeros((len(X),len(fullratios))) hmmsvmc_mses_curves = zeros((len(X),len(fullratios))) # # for i in range(0, len(fullratios)): for j in range(0, len(X)): gnbc_ratios_curves[j,i] = fullratios[i][2][j] gnbc_mses_curves[j,i] = fullmses[i][2][j] avac_ratios_curves[j,i] = fullratios[i][3][j] avac_mses_curves[j,i] = fullmses[i][3][j] hmmc_ratios_curves[j,i] = fullratios[i][4][j] hmmc_mses_curves[j,i] = fullmses[i][4][j] hmmsvmc_ratios_curves[j,i] = fullratios[i][5][j] hmmsvmc_mses_curves[j,i] = fullmses[i][5][j] # # # # # gnbc_mses_curve = mean(gnbc_mses_curves,1) avac_mses_curve = mean(avac_mses_curves,1) hmmc_mses_curve = mean(hmmc_mses_curves,1) hmmsvmc_mses_curve = mean(hmmsvmc_mses_curves,1) # gnbc_ratios_curve = mean(gnbc_ratios_curves,1) avac_ratios_curve = mean(avac_ratios_curves,1) hmmc_ratios_curve = mean(hmmc_ratios_curves,1) hmmsvmc_ratios_curve = mean(hmmsvmc_ratios_curves,1) # figure() subplot(211) plot(sample_sizes,gnbc_ratios_curve) plot(sample_sizes,avac_ratios_curve) plot(sample_sizes,hmmc_ratios_curve) plot(sample_sizes,hmmsvmc_ratios_curve) xlabel('Part of training set used') ylabel('Accuracy') legend(['GNB','SVM','HMM','HMM/SVM']) # subplot(212) plot(sample_sizes,gnbc_mses_curve) plot(sample_sizes,avac_mses_curve) plot(sample_sizes,hmmc_mses_curve) plot(sample_sizes,hmmsvmc_mses_curve) xlabel('Part of training set used') ylabel('Mean Squared Error') # legend(['GNB','SVM','HMM','HMM/SVM']) # show() # # tight_layout() savefig('ratios-2.png',dpi=500) # # #