e@0: #!/usr/bin/python2 e@0: # -*- coding: utf-8 -*- e@0: e@0: """ e@0: Classifier models e@0: @author: Emmanouil Theofanis Chourdakis e@0: """ e@0: e@0: e@0: # Imports needed e@0: e@0: from hmmlearn.hmm import GMM e@0: from hmmlearn import hmm e@0: e@0: from sklearn import svm e@0: import copy as cp e@0: e@0: from scipy.misc import logsumexp e@0: import pickle e@0: e@0: from collections import Counter e@0: e@0: class HMMsvm: e@0: def __init__(self, svm_): e@0: self.svm = svm_ e@0: e@0: e@0: # self.svm = MyAVAClassifier() e@0: e@0: e@0: def fit(self, features_list, states, flr=1e-13): e@0: # def fit(self, features_list, flr=1e-13): e@0: e@0: # TODO: Concatenate features, train e@0: #self.svm.fit(X, states, flr) e@0: #self.prior = self.svm.prior e@0: #self.transmat = self.svm.transmat e@0: e@0: e@0: features = None e@0: e@0: for f in features_list: e@0: if features is None: e@0: features = f e@0: else: e@0: features = append(features, f, 0) e@0: e@0: fullstates = None e@0: # T = features.shape[0] e@0: for s in states: e@0: if fullstates is None: e@0: fullstates = s e@0: else: e@0: fullstates = append(fullstates, s) e@0: e@0: e@0: e@0: e@0: e@0: e@0: self.n_classes = self.svm.n_classes e@0: n_classes = self.n_classes e@0: e@0: # print fullstates, shape(fullstates) e@0: e@0: h = histogram(fullstates, n_classes)[0].astype(float) e@0: self.prior = h/sum(h) e@0: e@0: # Add a very small probability for random jump e@0: e@0: self.prior += flr e@0: self.prior = self.prior/sum(self.prior) e@0: e@0: transmat = zeros((n_classes, n_classes)) e@0: transitions = zeros((n_classes, )) e@0: for s in states: e@0: for i in range(1, len(s)): e@0: prev = s[i-1] e@0: cur = s[i] e@0: transmat[prev,cur] += 1 e@0: transitions[prev] += 1 e@0: e@0: transitions[transitions == 0] = 1 e@0: e@0: for i in range(0, transmat.shape[0]): e@0: transmat[i,:] = transmat[i,:]/transitions[i] e@0: e@0: self.transmat = transmat e@0: e@0: # Add a very small probability for random jump to avoid zero values e@0: e@0: self.transmat += flr e@0: e@0: for i in range(0, self.transmat.shape[0]): e@0: self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:]) e@0: e@0: e@0: A = zeros((n_classes, n_classes)) e@0: e@0: Aold = self.transmat e@0: e@0: while mse(A,Aold)>10*size(A)*flr: e@0: Aold = A.copy() e@0: A = zeros((n_classes, n_classes)) e@0: transitions = zeros((n_classes, )) e@0: e@0: for i in range(0,len(features_list)): e@0: f = features_list[i] e@0: # print "FEATURES:", f e@0: # print f e@0: s,p = self.logviterbi(f) e@0: # print s e@0: for j in range(1, len(s)): e@0: prev = s[j-1] e@0: cur = s[j] e@0: A[prev,cur] += 1 e@0: transitions[prev] += 1 e@0: e@0: transitions[transitions == 0] = 1 e@0: e@0: e@0: for i in range(0, A.shape[0]): e@0: A[i,:] = A[i,:]/transitions[i] e@0: e@0: A += flr e@0: e@0: e@0: e@0: self.transmat = A e@0: e@0: for i in range(0, A.shape[0]): e@0: if sum(A[i,:]) > 0: e@0: A[i,:] = A[i,:]/sum(A[i,:]) e@0: e@0: e@0: e@0: # print observations e@0: e@0: e@0: def estimate_emission_probability(self, x, q): e@0: post = self.svm.estimate_posterior_probability_vector(x) e@0: # print "post: ", post e@0: prior = self.prior e@0: # print "prior: ", prior e@0: p = post/prior e@0: p = p/sum(p) e@0: e@0: return p[q] e@0: e@0: # def estimate_emission_probability(self, x, q): e@0: # return self.svm.estimate_emission_probability(q, x) e@0: e@0: e@0: def predict(self, X): e@0: return self.logviterbi(X)[0] e@0: e@0: e@0: def logviterbi(self, X): e@0: # Number of states e@0: e@0: N = self.n_classes e@0: e@0: # Number of observations e@0: e@0: e@0: e@0: T = X.shape[0] e@0: e@0: e@0: e@0: transmat = self.transmat e@0: e@0: #1. Initalization e@0: e@0: a1 = self.prior e@0: e@0: delta = zeros((T,N)) e@0: psi = zeros((T,N)) e@0: e@0: for i in range(0, N): e@0: delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i)) e@0: e@0: e@0: #2. Recursion e@0: e@0: for t in range(1, T): e@0: # delta_old = delta.copy() e@0: x = X[t, :] e@0: for j in range(0, N): e@0: # print "j: %d, S" % j, delta_old+log(transmat[:,j]) e@0: delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j)) e@0: psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j])) e@0: e@0: # print "t: %d, delta: " % t, delta e@0: # print "t: %d, psi:" % t, psi e@0: e@0: e@0: # 3. Termination e@0: e@0: p_star = max(delta[T-1,:] + log(transmat[:,N-1])) e@0: q_star = argmax(delta[T-1,:] + log(transmat[:, N-1])) e@0: e@0: e@0: # 4. Backtracking e@0: e@0: q = zeros((T,)) e@0: p = zeros((T,)) e@0: e@0: q[-1] = q_star e@0: p[-1] = p_star e@0: for t in range(1, T): e@0: q[-t-1] = psi[-t, q[-t]] e@0: p[-t-1] = delta[-t, q[-t]] e@0: e@0: e@0: return q,p e@0: e@0: def viterbi(self, X): e@0: # Number of states e@0: e@0: N = self.n_classes e@0: e@0: # Number of observations e@0: e@0: T = X.shape[0] e@0: e@0: transmat = self.transmat e@0: e@0: #1. Initialization e@0: e@0: a1 = self.prior e@0: e@0: delta = zeros((N, )) e@0: psi = zeros((N, )) e@0: e@0: for i in range(0, N): e@0: delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i) e@0: e@0: e@0: e@0: e@0: e@0: #2. Recursion e@0: e@0: for t in range(1, T): e@0: delta_old = delta.copy() e@0: x = X[t,:] e@0: for j in range(0, N): e@0: delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j) e@0: psi[j] = argmax(delta_old*transmat[:,j]) e@0: e@0: #3. Termination e@0: e@0: p_star = max(delta*transmat[:,N-1]) e@0: q_star = argmax(delta*transmat[:,N-1]) e@0: e@0: e@0: e@0: #4. Backtracking e@0: e@0: q = zeros((T,)) e@0: q[-1] = q_star e@0: e@0: for t in range(1, T): e@0: q[-t-1] = psi[q[-t]] e@0: e@0: return q e@0: e@0: e@0: def _log_likelihood_matrix(self, X): e@0: N = self.n_classes e@0: ll = zeros((X.shape[0], N)) e@0: e@0: for i in range(0, X.shape[0]): e@0: ll[i,:] = self._forward(i, X) e@0: e@0: return ll e@0: e@0: def compute_ksus(self, X): e@0: N = self.n_classes e@0: T = X.shape[0] e@0: print "[II] Computing gammas... " e@0: e@0: gammas = self._forward_backward(X) e@0: e@0: # Initialize e@0: e@0: aij = self.transmat e@0: e@0: e@0: e@0: e@0: e@0: e@0: def _not_quite_ksu(self, t, i, j, X): e@0: alpha = exp(self.forward(X[0:t+1,:]))[i] e@0: beta = exp(self.backward(X[t+1:,:]))[j] e@0: e@0: aij = self.transmat[i,j] e@0: bj = self.estimate_emission_probability(X[t+1,:], j) e@0: e@0: return alpha*beta*aij*bj e@0: e@0: def _ksu(self, t, i, j, X): e@0: alphaT = exp(self.forward(X[0:t+1,:]))[0] e@0: e@0: return self._not_quite_ksu(t,i,j,X) e@0: e@0: e@0: def _forward_backward(self, X): e@0: T = X.shape[0] e@0: N = self.n_classes e@0: fv = zeros((T, N)) e@0: sv = zeros((T, N)) e@0: e@0: b = None e@0: for t in range(1, T+1): e@0: e@0: fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ]) e@0: e@0: for t in range(1, T+1): e@0: b = self._backward_message(b, X[-t: , :]) e@0: sv[-t,:] = lognorm(fv[t-1,:]*b) e@0: e@0: return sv e@0: e@0: e@0: def _backward(self, t, X): e@0: N = self.n_classes e@0: a = ones((N,))/N e@0: e@0: beta = zeros((N, )) e@0: transmat = self.transmat e@0: T = X.shape[0] e@0: x = X[t, :] e@0: if t == T-1: e@0: beta = log(a) e@0: e@0: return beta e@0: else: e@0: tosum = zeros((N, )) e@0: bt = self._backward(t+1, X) e@0: btnew = zeros((N, )) e@0: # print bt e@0: for j in range(0, N): e@0: for i in range(0, N): e@0: # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j]) e@0: tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j]) e@0: # print tosum e@0: e@0: btnew[j] = logsumexp(tosum) e@0: btnew = lognorm(btnew) e@0: return btnew e@0: e@0: e@0: def score(self, X): e@0: return self.forward(X) e@0: e@0: def forward(self, X): e@0: T = X.shape[0] e@0: f = None e@0: for t in range(1, T+1): e@0: f = self._forward_message(f, X[0:t, :]) e@0: e@0: return f e@0: e@0: def backward(self, X): e@0: T = X.shape[0] e@0: b = None e@0: for t in range(1,T+1): e@0: # print t e@0: # print t,b e@0: idx = arange(-t,T) e@0: # print idx e@0: b = self._backward_message(b, X[-t:, :]) e@0: e@0: return b e@0: e@0: def _backward_message(self, b, X): e@0: N = self.n_classes e@0: e@0: e@0: beta = zeros((N, )) e@0: transmat = self.transmat e@0: e@0: x = X[0, :] e@0: e@0: if X.shape[0] == 1: e@0: beta = log(ones((N,))/N) e@0: return beta e@0: else: e@0: tosum = zeros((N, )) e@0: btnew = zeros((N, )) e@0: for j in range(0, N): e@0: for i in range(0, N): e@0: tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j]) e@0: e@0: btnew[j] = logsumexp(tosum) e@0: # btnew = lognorm(btnew) e@0: return btnew e@0: e@0: def _forward_message(self, f, X): e@0: N = self.n_classes e@0: a = self.prior e@0: e@0: alpha = zeros((N, )) e@0: transmat = self.transmat e@0: e@0: x = X[-1, :] e@0: e@0: if X.shape[0] == 1: e@0: for i in range(0, N): e@0: alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) e@0: # alpha = lognorm(alpha) e@0: return alpha e@0: e@0: else: e@0: tosum = zeros((N,)) e@0: ftnew = zeros((N,)) e@0: for j in range(0, N): e@0: for i in range(0, N): e@0: tosum[i] = f[i] + log(transmat[i,j]) e@0: Sum = logsumexp(tosum) e@0: bj = self.estimate_emission_probability(x, j) e@0: ftnew[j] = Sum+log(bj) e@0: e@0: # ftnew = lognorm(ftnew) e@0: return ftnew e@0: e@0: def _forward(self, t, X): e@0: N = self.n_classes e@0: a = self.prior e@0: # print a e@0: alpha = zeros((N, )) e@0: transmat = self.transmat e@0: x = X[t, :] e@0: e@0: if t == 0: e@0: for i in range(0, N): e@0: alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) e@0: # print "--" e@0: # print alpha e@0: alpha = lognorm(alpha) e@0: # print alpha e@0: # print "xx" e@0: return alpha e@0: else: e@0: tosum = zeros((N, )) e@0: ft = self._forward(t-1, X) e@0: ftnew = zeros((N, )) e@0: for j in range(0, N): e@0: for i in range(0, N): e@0: # print ft e@0: tosum[i] = ft[i] + log(transmat[i,j]) e@0: e@0: Sum = logsumexp(tosum) e@0: bj = self.estimate_emission_probability(x, j) e@0: ftnew[j] = Sum+log(bj) e@0: ftnew = lognorm(ftnew) e@0: e@0: return ftnew