view experiment-reverb/code/cmodels.py @ 2:c87a9505f294 tip

Added LICENSE for code, removed .wav files
author Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk>
date Sat, 30 Sep 2017 13:25:50 +0100
parents 246d5546657c
children
line wrap: on
line source
#!/usr/bin/python2
# -*- coding: utf-8 -*-

"""
Classifier models
@author: Emmanouil Theofanis Chourdakis
"""


# Imports needed

from hmmlearn.hmm import GMM
from hmmlearn import hmm

from sklearn import svm
import copy as cp

from scipy.misc import logsumexp
import pickle

from collections import Counter

class HMMsvm:
    def __init__(self, svm_):
        self.svm = svm_


      #  self.svm = MyAVAClassifier()


    def fit(self, features_list, states, flr=1e-13):
 #       def fit(self, features_list, flr=1e-13):

        # TODO: Concatenate features, train
        #self.svm.fit(X, states, flr)
        #self.prior = self.svm.prior
        #self.transmat = self.svm.transmat


        features = None

        for f in features_list:
            if features is None:
                features = f
            else:
                features = append(features, f, 0)

        fullstates = None
#            T = features.shape[0]
        for s in states:
            if fullstates is None:
                fullstates = s
            else:
                fullstates = append(fullstates, s)






        self.n_classes = self.svm.n_classes
        n_classes =  self.n_classes

  #      print fullstates, shape(fullstates)

        h = histogram(fullstates, n_classes)[0].astype(float)
        self.prior = h/sum(h)

        # Add a very small probability for random jump

        self.prior += flr
        self.prior = self.prior/sum(self.prior)

        transmat = zeros((n_classes, n_classes))
        transitions = zeros((n_classes, ))
        for s in states:
            for i in range(1, len(s)):
                prev = s[i-1]
                cur = s[i]
                transmat[prev,cur] += 1
                transitions[prev] += 1

        transitions[transitions == 0] = 1

        for i in range(0, transmat.shape[0]):
            transmat[i,:] = transmat[i,:]/transitions[i]

        self.transmat = transmat

        # Add a very small probability for random jump to avoid zero values

        self.transmat += flr

        for i in range(0, self.transmat.shape[0]):
            self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:])


        A = zeros((n_classes, n_classes))

        Aold = self.transmat

        while mse(A,Aold)>10*size(A)*flr:
            Aold = A.copy()
            A = zeros((n_classes, n_classes))
            transitions = zeros((n_classes, ))

            for i in range(0,len(features_list)):
                f = features_list[i]
  #              print "FEATURES:", f
             #   print f
                s,p = self.logviterbi(f)
            #    print s
                for j in range(1, len(s)):
                    prev = s[j-1]
                    cur = s[j]
                    A[prev,cur] += 1
                    transitions[prev] += 1

            transitions[transitions == 0] = 1


            for i in range(0, A.shape[0]):
                A[i,:] = A[i,:]/transitions[i]

            A += flr



        self.transmat = A

        for i in range(0, A.shape[0]):
            if sum(A[i,:]) > 0:
                A[i,:] = A[i,:]/sum(A[i,:])



                 #   print observations


    def estimate_emission_probability(self, x, q):
        post = self.svm.estimate_posterior_probability_vector(x)
     #   print "post: ", post
        prior = self.prior
     #   print "prior: ", prior
        p = post/prior
        p = p/sum(p)

        return p[q]

 #   def estimate_emission_probability(self, x, q):
 #       return self.svm.estimate_emission_probability(q, x)


    def predict(self, X):
        return self.logviterbi(X)[0]


    def logviterbi(self, X):
        # Number of states

        N = self.n_classes

        # Number of observations



        T = X.shape[0]



        transmat = self.transmat

        #1. Initalization

        a1 = self.prior

        delta = zeros((T,N))
        psi = zeros((T,N))

        for i in range(0, N):
            delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i))


        #2. Recursion

        for t in range(1, T):
        #    delta_old = delta.copy()
            x = X[t, :]
            for j in range(0, N):
              #  print "j: %d, S" % j, delta_old+log(transmat[:,j])
                delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j))
                psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j]))

    #        print "t: %d, delta: " % t, delta
     #       print "t: %d, psi:" % t, psi


        # 3. Termination

        p_star = max(delta[T-1,:] + log(transmat[:,N-1]))
        q_star = argmax(delta[T-1,:] + log(transmat[:, N-1]))


        # 4. Backtracking

        q = zeros((T,))
        p = zeros((T,))

        q[-1] = q_star
        p[-1] = p_star
        for t in range(1, T):
            q[-t-1] = psi[-t,   q[-t]]
            p[-t-1] = delta[-t, q[-t]]


        return q,p

    def viterbi(self, X):
        # Number of states

        N = self.n_classes

        # Number of observations

        T = X.shape[0]

        transmat = self.transmat

        #1. Initialization

        a1 = self.prior

        delta = zeros((N, ))
        psi = zeros((N, ))

        for i in range(0, N):
            delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i)





        #2. Recursion

        for t in range(1, T):
            delta_old = delta.copy()
            x = X[t,:]
            for j in range(0, N):
                delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j)
                psi[j] = argmax(delta_old*transmat[:,j])

        #3. Termination

        p_star = max(delta*transmat[:,N-1])
        q_star = argmax(delta*transmat[:,N-1])



        #4. Backtracking

        q = zeros((T,))
        q[-1] = q_star

        for t in range(1, T):
            q[-t-1] = psi[q[-t]]

        return q


    def _log_likelihood_matrix(self, X):
        N = self.n_classes
        ll = zeros((X.shape[0], N))

        for i in range(0, X.shape[0]):
            ll[i,:] = self._forward(i, X)

        return ll

    def compute_ksus(self, X):
        N = self.n_classes
        T = X.shape[0]
        print "[II] Computing gammas... "

        gammas = self._forward_backward(X)

        # Initialize

        aij = self.transmat






    def _not_quite_ksu(self, t, i, j, X):
        alpha = exp(self.forward(X[0:t+1,:]))[i]
        beta = exp(self.backward(X[t+1:,:]))[j]

        aij = self.transmat[i,j]
        bj = self.estimate_emission_probability(X[t+1,:], j)

        return alpha*beta*aij*bj

    def _ksu(self, t, i, j, X):
        alphaT = exp(self.forward(X[0:t+1,:]))[0]

        return self._not_quite_ksu(t,i,j,X)


    def _forward_backward(self, X):
        T = X.shape[0]
        N = self.n_classes
        fv = zeros((T, N))
        sv = zeros((T, N))

        b = None
        for t in range(1, T+1):

            fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ])

        for t in range(1, T+1):
            b = self._backward_message(b, X[-t: , :])
            sv[-t,:] = lognorm(fv[t-1,:]*b)

        return sv


    def _backward(self, t, X):
        N = self.n_classes
        a = ones((N,))/N

        beta = zeros((N, ))
        transmat = self.transmat
        T = X.shape[0]
        x = X[t, :]
        if t == T-1:
            beta = log(a)

            return beta
        else:
            tosum = zeros((N, ))
            bt = self._backward(t+1, X)
            btnew = zeros((N, ))
         #   print bt
            for j in range(0, N):
                for i in range(0, N):
                  #  print log(self.estimate_emission_probability(x, j)),  bt[i], log(transmat[i,j])
                    tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j])
              #  print tosum

                btnew[j] = logsumexp(tosum)
            btnew = lognorm(btnew)
            return btnew


    def score(self, X):
        return self.forward(X)

    def forward(self, X):
        T = X.shape[0]
        f = None
        for t in range(1, T+1):
            f = self._forward_message(f, X[0:t, :])

        return f

    def backward(self, X):
        T = X.shape[0]
        b = None
        for t in range(1,T+1):
         #   print t
    #        print t,b
            idx = arange(-t,T)
     #       print idx
            b = self._backward_message(b, X[-t:, :])

        return b

    def _backward_message(self, b, X):
        N = self.n_classes


        beta = zeros((N, ))
        transmat = self.transmat

        x = X[0, :]

        if X.shape[0] == 1:
            beta = log(ones((N,))/N)
            return beta
        else:
            tosum = zeros((N, ))
            btnew = zeros((N, ))
            for j in range(0, N):
                for i in range(0, N):
                    tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j])

                btnew[j] = logsumexp(tosum)
        #    btnew = lognorm(btnew)
            return btnew

    def _forward_message(self, f, X):
        N = self.n_classes
        a = self.prior

        alpha = zeros((N, ))
        transmat = self.transmat

        x = X[-1, :]

        if X.shape[0] == 1:
            for i in range(0, N):
                alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i))
         #   alpha = lognorm(alpha)
            return alpha

        else:
            tosum = zeros((N,))
            ftnew = zeros((N,))
            for j in range(0, N):
                for i in range(0, N):
                    tosum[i] = f[i] + log(transmat[i,j])
                Sum = logsumexp(tosum)
                bj = self.estimate_emission_probability(x, j)
                ftnew[j] = Sum+log(bj)

        #    ftnew = lognorm(ftnew)
            return ftnew

    def _forward(self, t, X):
        N = self.n_classes
        a = self.prior
    #    print a
        alpha = zeros((N, ))
        transmat = self.transmat
        x = X[t, :]

        if t == 0:
            for i in range(0, N):
                alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i))
           # print "--"
          #  print alpha
            alpha = lognorm(alpha)
          #  print alpha
          #  print "xx"
            return alpha
        else:
            tosum = zeros((N, ))
            ft = self._forward(t-1, X)
            ftnew = zeros((N, ))
            for j in range(0, N):
                for i in range(0, N):
               #     print ft
                    tosum[i] = ft[i] + log(transmat[i,j])

                Sum = logsumexp(tosum)
                bj = self.estimate_emission_probability(x, j)
                ftnew[j] = Sum+log(bj)
            ftnew = lognorm(ftnew)

            return ftnew