Mercurial > hg > chourdakisreiss2016
view experiment-reverb/code/cmodels.py @ 2:c87a9505f294 tip
Added LICENSE for code, removed .wav files
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Sat, 30 Sep 2017 13:25:50 +0100 |
parents | 246d5546657c |
children |
line wrap: on
line source
#!/usr/bin/python2 # -*- coding: utf-8 -*- """ Classifier models @author: Emmanouil Theofanis Chourdakis """ # Imports needed from hmmlearn.hmm import GMM from hmmlearn import hmm from sklearn import svm import copy as cp from scipy.misc import logsumexp import pickle from collections import Counter class HMMsvm: def __init__(self, svm_): self.svm = svm_ # self.svm = MyAVAClassifier() def fit(self, features_list, states, flr=1e-13): # def fit(self, features_list, flr=1e-13): # TODO: Concatenate features, train #self.svm.fit(X, states, flr) #self.prior = self.svm.prior #self.transmat = self.svm.transmat features = None for f in features_list: if features is None: features = f else: features = append(features, f, 0) fullstates = None # T = features.shape[0] for s in states: if fullstates is None: fullstates = s else: fullstates = append(fullstates, s) self.n_classes = self.svm.n_classes n_classes = self.n_classes # print fullstates, shape(fullstates) h = histogram(fullstates, n_classes)[0].astype(float) self.prior = h/sum(h) # Add a very small probability for random jump self.prior += flr self.prior = self.prior/sum(self.prior) transmat = zeros((n_classes, n_classes)) transitions = zeros((n_classes, )) for s in states: for i in range(1, len(s)): prev = s[i-1] cur = s[i] transmat[prev,cur] += 1 transitions[prev] += 1 transitions[transitions == 0] = 1 for i in range(0, transmat.shape[0]): transmat[i,:] = transmat[i,:]/transitions[i] self.transmat = transmat # Add a very small probability for random jump to avoid zero values self.transmat += flr for i in range(0, self.transmat.shape[0]): self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:]) A = zeros((n_classes, n_classes)) Aold = self.transmat while mse(A,Aold)>10*size(A)*flr: Aold = A.copy() A = zeros((n_classes, n_classes)) transitions = zeros((n_classes, )) for i in range(0,len(features_list)): f = features_list[i] # print "FEATURES:", f # print f s,p = self.logviterbi(f) # print s for j in range(1, len(s)): prev = s[j-1] cur = s[j] A[prev,cur] += 1 transitions[prev] += 1 transitions[transitions == 0] = 1 for i in range(0, A.shape[0]): A[i,:] = A[i,:]/transitions[i] A += flr self.transmat = A for i in range(0, A.shape[0]): if sum(A[i,:]) > 0: A[i,:] = A[i,:]/sum(A[i,:]) # print observations def estimate_emission_probability(self, x, q): post = self.svm.estimate_posterior_probability_vector(x) # print "post: ", post prior = self.prior # print "prior: ", prior p = post/prior p = p/sum(p) return p[q] # def estimate_emission_probability(self, x, q): # return self.svm.estimate_emission_probability(q, x) def predict(self, X): return self.logviterbi(X)[0] def logviterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initalization a1 = self.prior delta = zeros((T,N)) psi = zeros((T,N)) for i in range(0, N): delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i)) #2. Recursion for t in range(1, T): # delta_old = delta.copy() x = X[t, :] for j in range(0, N): # print "j: %d, S" % j, delta_old+log(transmat[:,j]) delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j)) psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j])) # print "t: %d, delta: " % t, delta # print "t: %d, psi:" % t, psi # 3. Termination p_star = max(delta[T-1,:] + log(transmat[:,N-1])) q_star = argmax(delta[T-1,:] + log(transmat[:, N-1])) # 4. Backtracking q = zeros((T,)) p = zeros((T,)) q[-1] = q_star p[-1] = p_star for t in range(1, T): q[-t-1] = psi[-t, q[-t]] p[-t-1] = delta[-t, q[-t]] return q,p def viterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initialization a1 = self.prior delta = zeros((N, )) psi = zeros((N, )) for i in range(0, N): delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i) #2. Recursion for t in range(1, T): delta_old = delta.copy() x = X[t,:] for j in range(0, N): delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j) psi[j] = argmax(delta_old*transmat[:,j]) #3. Termination p_star = max(delta*transmat[:,N-1]) q_star = argmax(delta*transmat[:,N-1]) #4. Backtracking q = zeros((T,)) q[-1] = q_star for t in range(1, T): q[-t-1] = psi[q[-t]] return q def _log_likelihood_matrix(self, X): N = self.n_classes ll = zeros((X.shape[0], N)) for i in range(0, X.shape[0]): ll[i,:] = self._forward(i, X) return ll def compute_ksus(self, X): N = self.n_classes T = X.shape[0] print "[II] Computing gammas... " gammas = self._forward_backward(X) # Initialize aij = self.transmat def _not_quite_ksu(self, t, i, j, X): alpha = exp(self.forward(X[0:t+1,:]))[i] beta = exp(self.backward(X[t+1:,:]))[j] aij = self.transmat[i,j] bj = self.estimate_emission_probability(X[t+1,:], j) return alpha*beta*aij*bj def _ksu(self, t, i, j, X): alphaT = exp(self.forward(X[0:t+1,:]))[0] return self._not_quite_ksu(t,i,j,X) def _forward_backward(self, X): T = X.shape[0] N = self.n_classes fv = zeros((T, N)) sv = zeros((T, N)) b = None for t in range(1, T+1): fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ]) for t in range(1, T+1): b = self._backward_message(b, X[-t: , :]) sv[-t,:] = lognorm(fv[t-1,:]*b) return sv def _backward(self, t, X): N = self.n_classes a = ones((N,))/N beta = zeros((N, )) transmat = self.transmat T = X.shape[0] x = X[t, :] if t == T-1: beta = log(a) return beta else: tosum = zeros((N, )) bt = self._backward(t+1, X) btnew = zeros((N, )) # print bt for j in range(0, N): for i in range(0, N): # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j]) tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j]) # print tosum btnew[j] = logsumexp(tosum) btnew = lognorm(btnew) return btnew def score(self, X): return self.forward(X) def forward(self, X): T = X.shape[0] f = None for t in range(1, T+1): f = self._forward_message(f, X[0:t, :]) return f def backward(self, X): T = X.shape[0] b = None for t in range(1,T+1): # print t # print t,b idx = arange(-t,T) # print idx b = self._backward_message(b, X[-t:, :]) return b def _backward_message(self, b, X): N = self.n_classes beta = zeros((N, )) transmat = self.transmat x = X[0, :] if X.shape[0] == 1: beta = log(ones((N,))/N) return beta else: tosum = zeros((N, )) btnew = zeros((N, )) for j in range(0, N): for i in range(0, N): tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j]) btnew[j] = logsumexp(tosum) # btnew = lognorm(btnew) return btnew def _forward_message(self, f, X): N = self.n_classes a = self.prior alpha = zeros((N, )) transmat = self.transmat x = X[-1, :] if X.shape[0] == 1: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # alpha = lognorm(alpha) return alpha else: tosum = zeros((N,)) ftnew = zeros((N,)) for j in range(0, N): for i in range(0, N): tosum[i] = f[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) # ftnew = lognorm(ftnew) return ftnew def _forward(self, t, X): N = self.n_classes a = self.prior # print a alpha = zeros((N, )) transmat = self.transmat x = X[t, :] if t == 0: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # print "--" # print alpha alpha = lognorm(alpha) # print alpha # print "xx" return alpha else: tosum = zeros((N, )) ft = self._forward(t-1, X) ftnew = zeros((N, )) for j in range(0, N): for i in range(0, N): # print ft tosum[i] = ft[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) ftnew = lognorm(ftnew) return ftnew