Mercurial > hg > chourdakisreiss2016
diff experiment-reverb/code/cmodels.py @ 0:246d5546657c
initial commit, needs cleanup
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Wed, 14 Dec 2016 13:15:48 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/experiment-reverb/code/cmodels.py Wed Dec 14 13:15:48 2016 +0000 @@ -0,0 +1,461 @@ +#!/usr/bin/python2 +# -*- coding: utf-8 -*- + +""" +Classifier models +@author: Emmanouil Theofanis Chourdakis +""" + + +# Imports needed + +from hmmlearn.hmm import GMM +from hmmlearn import hmm + +from sklearn import svm +import copy as cp + +from scipy.misc import logsumexp +import pickle + +from collections import Counter + +class HMMsvm: + def __init__(self, svm_): + self.svm = svm_ + + + # self.svm = MyAVAClassifier() + + + def fit(self, features_list, states, flr=1e-13): + # def fit(self, features_list, flr=1e-13): + + # TODO: Concatenate features, train + #self.svm.fit(X, states, flr) + #self.prior = self.svm.prior + #self.transmat = self.svm.transmat + + + features = None + + for f in features_list: + if features is None: + features = f + else: + features = append(features, f, 0) + + fullstates = None +# T = features.shape[0] + for s in states: + if fullstates is None: + fullstates = s + else: + fullstates = append(fullstates, s) + + + + + + + self.n_classes = self.svm.n_classes + n_classes = self.n_classes + + # print fullstates, shape(fullstates) + + h = histogram(fullstates, n_classes)[0].astype(float) + self.prior = h/sum(h) + + # Add a very small probability for random jump + + self.prior += flr + self.prior = self.prior/sum(self.prior) + + transmat = zeros((n_classes, n_classes)) + transitions = zeros((n_classes, )) + for s in states: + for i in range(1, len(s)): + prev = s[i-1] + cur = s[i] + transmat[prev,cur] += 1 + transitions[prev] += 1 + + transitions[transitions == 0] = 1 + + for i in range(0, transmat.shape[0]): + transmat[i,:] = transmat[i,:]/transitions[i] + + self.transmat = transmat + + # Add a very small probability for random jump to avoid zero values + + self.transmat += flr + + for i in range(0, self.transmat.shape[0]): + self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:]) + + + A = zeros((n_classes, n_classes)) + + Aold = self.transmat + + while mse(A,Aold)>10*size(A)*flr: + Aold = A.copy() + A = zeros((n_classes, n_classes)) + transitions = zeros((n_classes, )) + + for i in range(0,len(features_list)): + f = features_list[i] + # print "FEATURES:", f + # print f + s,p = self.logviterbi(f) + # print s + for j in range(1, len(s)): + prev = s[j-1] + cur = s[j] + A[prev,cur] += 1 + transitions[prev] += 1 + + transitions[transitions == 0] = 1 + + + for i in range(0, A.shape[0]): + A[i,:] = A[i,:]/transitions[i] + + A += flr + + + + self.transmat = A + + for i in range(0, A.shape[0]): + if sum(A[i,:]) > 0: + A[i,:] = A[i,:]/sum(A[i,:]) + + + + # print observations + + + def estimate_emission_probability(self, x, q): + post = self.svm.estimate_posterior_probability_vector(x) + # print "post: ", post + prior = self.prior + # print "prior: ", prior + p = post/prior + p = p/sum(p) + + return p[q] + + # def estimate_emission_probability(self, x, q): + # return self.svm.estimate_emission_probability(q, x) + + + def predict(self, X): + return self.logviterbi(X)[0] + + + def logviterbi(self, X): + # Number of states + + N = self.n_classes + + # Number of observations + + + + T = X.shape[0] + + + + transmat = self.transmat + + #1. Initalization + + a1 = self.prior + + delta = zeros((T,N)) + psi = zeros((T,N)) + + for i in range(0, N): + delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i)) + + + #2. Recursion + + for t in range(1, T): + # delta_old = delta.copy() + x = X[t, :] + for j in range(0, N): + # print "j: %d, S" % j, delta_old+log(transmat[:,j]) + delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j)) + psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j])) + + # print "t: %d, delta: " % t, delta + # print "t: %d, psi:" % t, psi + + + # 3. Termination + + p_star = max(delta[T-1,:] + log(transmat[:,N-1])) + q_star = argmax(delta[T-1,:] + log(transmat[:, N-1])) + + + # 4. Backtracking + + q = zeros((T,)) + p = zeros((T,)) + + q[-1] = q_star + p[-1] = p_star + for t in range(1, T): + q[-t-1] = psi[-t, q[-t]] + p[-t-1] = delta[-t, q[-t]] + + + return q,p + + def viterbi(self, X): + # Number of states + + N = self.n_classes + + # Number of observations + + T = X.shape[0] + + transmat = self.transmat + + #1. Initialization + + a1 = self.prior + + delta = zeros((N, )) + psi = zeros((N, )) + + for i in range(0, N): + delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i) + + + + + + #2. Recursion + + for t in range(1, T): + delta_old = delta.copy() + x = X[t,:] + for j in range(0, N): + delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j) + psi[j] = argmax(delta_old*transmat[:,j]) + + #3. Termination + + p_star = max(delta*transmat[:,N-1]) + q_star = argmax(delta*transmat[:,N-1]) + + + + #4. Backtracking + + q = zeros((T,)) + q[-1] = q_star + + for t in range(1, T): + q[-t-1] = psi[q[-t]] + + return q + + + def _log_likelihood_matrix(self, X): + N = self.n_classes + ll = zeros((X.shape[0], N)) + + for i in range(0, X.shape[0]): + ll[i,:] = self._forward(i, X) + + return ll + + def compute_ksus(self, X): + N = self.n_classes + T = X.shape[0] + print "[II] Computing gammas... " + + gammas = self._forward_backward(X) + + # Initialize + + aij = self.transmat + + + + + + + def _not_quite_ksu(self, t, i, j, X): + alpha = exp(self.forward(X[0:t+1,:]))[i] + beta = exp(self.backward(X[t+1:,:]))[j] + + aij = self.transmat[i,j] + bj = self.estimate_emission_probability(X[t+1,:], j) + + return alpha*beta*aij*bj + + def _ksu(self, t, i, j, X): + alphaT = exp(self.forward(X[0:t+1,:]))[0] + + return self._not_quite_ksu(t,i,j,X) + + + def _forward_backward(self, X): + T = X.shape[0] + N = self.n_classes + fv = zeros((T, N)) + sv = zeros((T, N)) + + b = None + for t in range(1, T+1): + + fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ]) + + for t in range(1, T+1): + b = self._backward_message(b, X[-t: , :]) + sv[-t,:] = lognorm(fv[t-1,:]*b) + + return sv + + + def _backward(self, t, X): + N = self.n_classes + a = ones((N,))/N + + beta = zeros((N, )) + transmat = self.transmat + T = X.shape[0] + x = X[t, :] + if t == T-1: + beta = log(a) + + return beta + else: + tosum = zeros((N, )) + bt = self._backward(t+1, X) + btnew = zeros((N, )) + # print bt + for j in range(0, N): + for i in range(0, N): + # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j]) + tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j]) + # print tosum + + btnew[j] = logsumexp(tosum) + btnew = lognorm(btnew) + return btnew + + + def score(self, X): + return self.forward(X) + + def forward(self, X): + T = X.shape[0] + f = None + for t in range(1, T+1): + f = self._forward_message(f, X[0:t, :]) + + return f + + def backward(self, X): + T = X.shape[0] + b = None + for t in range(1,T+1): + # print t + # print t,b + idx = arange(-t,T) + # print idx + b = self._backward_message(b, X[-t:, :]) + + return b + + def _backward_message(self, b, X): + N = self.n_classes + + + beta = zeros((N, )) + transmat = self.transmat + + x = X[0, :] + + if X.shape[0] == 1: + beta = log(ones((N,))/N) + return beta + else: + tosum = zeros((N, )) + btnew = zeros((N, )) + for j in range(0, N): + for i in range(0, N): + tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j]) + + btnew[j] = logsumexp(tosum) + # btnew = lognorm(btnew) + return btnew + + def _forward_message(self, f, X): + N = self.n_classes + a = self.prior + + alpha = zeros((N, )) + transmat = self.transmat + + x = X[-1, :] + + if X.shape[0] == 1: + for i in range(0, N): + alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) + # alpha = lognorm(alpha) + return alpha + + else: + tosum = zeros((N,)) + ftnew = zeros((N,)) + for j in range(0, N): + for i in range(0, N): + tosum[i] = f[i] + log(transmat[i,j]) + Sum = logsumexp(tosum) + bj = self.estimate_emission_probability(x, j) + ftnew[j] = Sum+log(bj) + + # ftnew = lognorm(ftnew) + return ftnew + + def _forward(self, t, X): + N = self.n_classes + a = self.prior + # print a + alpha = zeros((N, )) + transmat = self.transmat + x = X[t, :] + + if t == 0: + for i in range(0, N): + alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) + # print "--" + # print alpha + alpha = lognorm(alpha) + # print alpha + # print "xx" + return alpha + else: + tosum = zeros((N, )) + ft = self._forward(t-1, X) + ftnew = zeros((N, )) + for j in range(0, N): + for i in range(0, N): + # print ft + tosum[i] = ft[i] + log(transmat[i,j]) + + Sum = logsumexp(tosum) + bj = self.estimate_emission_probability(x, j) + ftnew[j] = Sum+log(bj) + ftnew = lognorm(ftnew) + + return ftnew