diff experiment-reverb/code/cmodels.py @ 0:246d5546657c

initial commit, needs cleanup
author Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk>
date Wed, 14 Dec 2016 13:15:48 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/experiment-reverb/code/cmodels.py	Wed Dec 14 13:15:48 2016 +0000
@@ -0,0 +1,461 @@
+#!/usr/bin/python2
+# -*- coding: utf-8 -*-
+
+"""
+Classifier models
+@author: Emmanouil Theofanis Chourdakis
+"""
+
+
+# Imports needed
+
+from hmmlearn.hmm import GMM
+from hmmlearn import hmm
+
+from sklearn import svm
+import copy as cp
+
+from scipy.misc import logsumexp
+import pickle
+
+from collections import Counter
+
+class HMMsvm:
+    def __init__(self, svm_):
+        self.svm = svm_
+
+
+      #  self.svm = MyAVAClassifier()
+
+
+    def fit(self, features_list, states, flr=1e-13):
+ #       def fit(self, features_list, flr=1e-13):
+
+        # TODO: Concatenate features, train
+        #self.svm.fit(X, states, flr)
+        #self.prior = self.svm.prior
+        #self.transmat = self.svm.transmat
+
+
+        features = None
+
+        for f in features_list:
+            if features is None:
+                features = f
+            else:
+                features = append(features, f, 0)
+
+        fullstates = None
+#            T = features.shape[0]
+        for s in states:
+            if fullstates is None:
+                fullstates = s
+            else:
+                fullstates = append(fullstates, s)
+
+
+
+
+
+
+        self.n_classes = self.svm.n_classes
+        n_classes =  self.n_classes
+
+  #      print fullstates, shape(fullstates)
+
+        h = histogram(fullstates, n_classes)[0].astype(float)
+        self.prior = h/sum(h)
+
+        # Add a very small probability for random jump
+
+        self.prior += flr
+        self.prior = self.prior/sum(self.prior)
+
+        transmat = zeros((n_classes, n_classes))
+        transitions = zeros((n_classes, ))
+        for s in states:
+            for i in range(1, len(s)):
+                prev = s[i-1]
+                cur = s[i]
+                transmat[prev,cur] += 1
+                transitions[prev] += 1
+
+        transitions[transitions == 0] = 1
+
+        for i in range(0, transmat.shape[0]):
+            transmat[i,:] = transmat[i,:]/transitions[i]
+
+        self.transmat = transmat
+
+        # Add a very small probability for random jump to avoid zero values
+
+        self.transmat += flr
+
+        for i in range(0, self.transmat.shape[0]):
+            self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:])
+
+
+        A = zeros((n_classes, n_classes))
+
+        Aold = self.transmat
+
+        while mse(A,Aold)>10*size(A)*flr:
+            Aold = A.copy()
+            A = zeros((n_classes, n_classes))
+            transitions = zeros((n_classes, ))
+
+            for i in range(0,len(features_list)):
+                f = features_list[i]
+  #              print "FEATURES:", f
+             #   print f
+                s,p = self.logviterbi(f)
+            #    print s
+                for j in range(1, len(s)):
+                    prev = s[j-1]
+                    cur = s[j]
+                    A[prev,cur] += 1
+                    transitions[prev] += 1
+
+            transitions[transitions == 0] = 1
+
+
+            for i in range(0, A.shape[0]):
+                A[i,:] = A[i,:]/transitions[i]
+
+            A += flr
+
+
+
+        self.transmat = A
+
+        for i in range(0, A.shape[0]):
+            if sum(A[i,:]) > 0:
+                A[i,:] = A[i,:]/sum(A[i,:])
+
+
+
+                 #   print observations
+
+
+    def estimate_emission_probability(self, x, q):
+        post = self.svm.estimate_posterior_probability_vector(x)
+     #   print "post: ", post
+        prior = self.prior
+     #   print "prior: ", prior
+        p = post/prior
+        p = p/sum(p)
+
+        return p[q]
+
+ #   def estimate_emission_probability(self, x, q):
+ #       return self.svm.estimate_emission_probability(q, x)
+
+
+    def predict(self, X):
+        return self.logviterbi(X)[0]
+
+
+    def logviterbi(self, X):
+        # Number of states
+
+        N = self.n_classes
+
+        # Number of observations
+
+
+
+        T = X.shape[0]
+
+
+
+        transmat = self.transmat
+
+        #1. Initalization
+
+        a1 = self.prior
+
+        delta = zeros((T,N))
+        psi = zeros((T,N))
+
+        for i in range(0, N):
+            delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i))
+
+
+        #2. Recursion
+
+        for t in range(1, T):
+        #    delta_old = delta.copy()
+            x = X[t, :]
+            for j in range(0, N):
+              #  print "j: %d, S" % j, delta_old+log(transmat[:,j])
+                delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j))
+                psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j]))
+
+    #        print "t: %d, delta: " % t, delta
+     #       print "t: %d, psi:" % t, psi
+
+
+        # 3. Termination
+
+        p_star = max(delta[T-1,:] + log(transmat[:,N-1]))
+        q_star = argmax(delta[T-1,:] + log(transmat[:, N-1]))
+
+
+        # 4. Backtracking
+
+        q = zeros((T,))
+        p = zeros((T,))
+
+        q[-1] = q_star
+        p[-1] = p_star
+        for t in range(1, T):
+            q[-t-1] = psi[-t,   q[-t]]
+            p[-t-1] = delta[-t, q[-t]]
+
+
+        return q,p
+
+    def viterbi(self, X):
+        # Number of states
+
+        N = self.n_classes
+
+        # Number of observations
+
+        T = X.shape[0]
+
+        transmat = self.transmat
+
+        #1. Initialization
+
+        a1 = self.prior
+
+        delta = zeros((N, ))
+        psi = zeros((N, ))
+
+        for i in range(0, N):
+            delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i)
+
+
+
+
+
+        #2. Recursion
+
+        for t in range(1, T):
+            delta_old = delta.copy()
+            x = X[t,:]
+            for j in range(0, N):
+                delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j)
+                psi[j] = argmax(delta_old*transmat[:,j])
+
+        #3. Termination
+
+        p_star = max(delta*transmat[:,N-1])
+        q_star = argmax(delta*transmat[:,N-1])
+
+
+
+        #4. Backtracking
+
+        q = zeros((T,))
+        q[-1] = q_star
+
+        for t in range(1, T):
+            q[-t-1] = psi[q[-t]]
+
+        return q
+
+
+    def _log_likelihood_matrix(self, X):
+        N = self.n_classes
+        ll = zeros((X.shape[0], N))
+
+        for i in range(0, X.shape[0]):
+            ll[i,:] = self._forward(i, X)
+
+        return ll
+
+    def compute_ksus(self, X):
+        N = self.n_classes
+        T = X.shape[0]
+        print "[II] Computing gammas... "
+
+        gammas = self._forward_backward(X)
+
+        # Initialize
+
+        aij = self.transmat
+
+
+
+
+
+
+    def _not_quite_ksu(self, t, i, j, X):
+        alpha = exp(self.forward(X[0:t+1,:]))[i]
+        beta = exp(self.backward(X[t+1:,:]))[j]
+
+        aij = self.transmat[i,j]
+        bj = self.estimate_emission_probability(X[t+1,:], j)
+
+        return alpha*beta*aij*bj
+
+    def _ksu(self, t, i, j, X):
+        alphaT = exp(self.forward(X[0:t+1,:]))[0]
+
+        return self._not_quite_ksu(t,i,j,X)
+
+
+    def _forward_backward(self, X):
+        T = X.shape[0]
+        N = self.n_classes
+        fv = zeros((T, N))
+        sv = zeros((T, N))
+
+        b = None
+        for t in range(1, T+1):
+
+            fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ])
+
+        for t in range(1, T+1):
+            b = self._backward_message(b, X[-t: , :])
+            sv[-t,:] = lognorm(fv[t-1,:]*b)
+
+        return sv
+
+
+    def _backward(self, t, X):
+        N = self.n_classes
+        a = ones((N,))/N
+
+        beta = zeros((N, ))
+        transmat = self.transmat
+        T = X.shape[0]
+        x = X[t, :]
+        if t == T-1:
+            beta = log(a)
+
+            return beta
+        else:
+            tosum = zeros((N, ))
+            bt = self._backward(t+1, X)
+            btnew = zeros((N, ))
+         #   print bt
+            for j in range(0, N):
+                for i in range(0, N):
+                  #  print log(self.estimate_emission_probability(x, j)),  bt[i], log(transmat[i,j])
+                    tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j])
+              #  print tosum
+
+                btnew[j] = logsumexp(tosum)
+            btnew = lognorm(btnew)
+            return btnew
+
+
+    def score(self, X):
+        return self.forward(X)
+
+    def forward(self, X):
+        T = X.shape[0]
+        f = None
+        for t in range(1, T+1):
+            f = self._forward_message(f, X[0:t, :])
+
+        return f
+
+    def backward(self, X):
+        T = X.shape[0]
+        b = None
+        for t in range(1,T+1):
+         #   print t
+    #        print t,b
+            idx = arange(-t,T)
+     #       print idx
+            b = self._backward_message(b, X[-t:, :])
+
+        return b
+
+    def _backward_message(self, b, X):
+        N = self.n_classes
+
+
+        beta = zeros((N, ))
+        transmat = self.transmat
+
+        x = X[0, :]
+
+        if X.shape[0] == 1:
+            beta = log(ones((N,))/N)
+            return beta
+        else:
+            tosum = zeros((N, ))
+            btnew = zeros((N, ))
+            for j in range(0, N):
+                for i in range(0, N):
+                    tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j])
+
+                btnew[j] = logsumexp(tosum)
+        #    btnew = lognorm(btnew)
+            return btnew
+
+    def _forward_message(self, f, X):
+        N = self.n_classes
+        a = self.prior
+
+        alpha = zeros((N, ))
+        transmat = self.transmat
+
+        x = X[-1, :]
+
+        if X.shape[0] == 1:
+            for i in range(0, N):
+                alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i))
+         #   alpha = lognorm(alpha)
+            return alpha
+
+        else:
+            tosum = zeros((N,))
+            ftnew = zeros((N,))
+            for j in range(0, N):
+                for i in range(0, N):
+                    tosum[i] = f[i] + log(transmat[i,j])
+                Sum = logsumexp(tosum)
+                bj = self.estimate_emission_probability(x, j)
+                ftnew[j] = Sum+log(bj)
+
+        #    ftnew = lognorm(ftnew)
+            return ftnew
+
+    def _forward(self, t, X):
+        N = self.n_classes
+        a = self.prior
+    #    print a
+        alpha = zeros((N, ))
+        transmat = self.transmat
+        x = X[t, :]
+
+        if t == 0:
+            for i in range(0, N):
+                alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i))
+           # print "--"
+          #  print alpha
+            alpha = lognorm(alpha)
+          #  print alpha
+          #  print "xx"
+            return alpha
+        else:
+            tosum = zeros((N, ))
+            ft = self._forward(t-1, X)
+            ftnew = zeros((N, ))
+            for j in range(0, N):
+                for i in range(0, N):
+               #     print ft
+                    tosum[i] = ft[i] + log(transmat[i,j])
+
+                Sum = logsumexp(tosum)
+                bj = self.estimate_emission_probability(x, j)
+                ftnew[j] = Sum+log(bj)
+            ftnew = lognorm(ftnew)
+
+            return ftnew