annotate experiment-reverb/code/cmodels.py @ 2:c87a9505f294 tip

Added LICENSE for code, removed .wav files
author Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk>
date Sat, 30 Sep 2017 13:25:50 +0100
parents 246d5546657c
children
rev   line source
e@0 1 #!/usr/bin/python2
e@0 2 # -*- coding: utf-8 -*-
e@0 3
e@0 4 """
e@0 5 Classifier models
e@0 6 @author: Emmanouil Theofanis Chourdakis
e@0 7 """
e@0 8
e@0 9
e@0 10 # Imports needed
e@0 11
e@0 12 from hmmlearn.hmm import GMM
e@0 13 from hmmlearn import hmm
e@0 14
e@0 15 from sklearn import svm
e@0 16 import copy as cp
e@0 17
e@0 18 from scipy.misc import logsumexp
e@0 19 import pickle
e@0 20
e@0 21 from collections import Counter
e@0 22
e@0 23 class HMMsvm:
e@0 24 def __init__(self, svm_):
e@0 25 self.svm = svm_
e@0 26
e@0 27
e@0 28 # self.svm = MyAVAClassifier()
e@0 29
e@0 30
e@0 31 def fit(self, features_list, states, flr=1e-13):
e@0 32 # def fit(self, features_list, flr=1e-13):
e@0 33
e@0 34 # TODO: Concatenate features, train
e@0 35 #self.svm.fit(X, states, flr)
e@0 36 #self.prior = self.svm.prior
e@0 37 #self.transmat = self.svm.transmat
e@0 38
e@0 39
e@0 40 features = None
e@0 41
e@0 42 for f in features_list:
e@0 43 if features is None:
e@0 44 features = f
e@0 45 else:
e@0 46 features = append(features, f, 0)
e@0 47
e@0 48 fullstates = None
e@0 49 # T = features.shape[0]
e@0 50 for s in states:
e@0 51 if fullstates is None:
e@0 52 fullstates = s
e@0 53 else:
e@0 54 fullstates = append(fullstates, s)
e@0 55
e@0 56
e@0 57
e@0 58
e@0 59
e@0 60
e@0 61 self.n_classes = self.svm.n_classes
e@0 62 n_classes = self.n_classes
e@0 63
e@0 64 # print fullstates, shape(fullstates)
e@0 65
e@0 66 h = histogram(fullstates, n_classes)[0].astype(float)
e@0 67 self.prior = h/sum(h)
e@0 68
e@0 69 # Add a very small probability for random jump
e@0 70
e@0 71 self.prior += flr
e@0 72 self.prior = self.prior/sum(self.prior)
e@0 73
e@0 74 transmat = zeros((n_classes, n_classes))
e@0 75 transitions = zeros((n_classes, ))
e@0 76 for s in states:
e@0 77 for i in range(1, len(s)):
e@0 78 prev = s[i-1]
e@0 79 cur = s[i]
e@0 80 transmat[prev,cur] += 1
e@0 81 transitions[prev] += 1
e@0 82
e@0 83 transitions[transitions == 0] = 1
e@0 84
e@0 85 for i in range(0, transmat.shape[0]):
e@0 86 transmat[i,:] = transmat[i,:]/transitions[i]
e@0 87
e@0 88 self.transmat = transmat
e@0 89
e@0 90 # Add a very small probability for random jump to avoid zero values
e@0 91
e@0 92 self.transmat += flr
e@0 93
e@0 94 for i in range(0, self.transmat.shape[0]):
e@0 95 self.transmat[i,:] = self.transmat[i,:]/sum(self.transmat[i,:])
e@0 96
e@0 97
e@0 98 A = zeros((n_classes, n_classes))
e@0 99
e@0 100 Aold = self.transmat
e@0 101
e@0 102 while mse(A,Aold)>10*size(A)*flr:
e@0 103 Aold = A.copy()
e@0 104 A = zeros((n_classes, n_classes))
e@0 105 transitions = zeros((n_classes, ))
e@0 106
e@0 107 for i in range(0,len(features_list)):
e@0 108 f = features_list[i]
e@0 109 # print "FEATURES:", f
e@0 110 # print f
e@0 111 s,p = self.logviterbi(f)
e@0 112 # print s
e@0 113 for j in range(1, len(s)):
e@0 114 prev = s[j-1]
e@0 115 cur = s[j]
e@0 116 A[prev,cur] += 1
e@0 117 transitions[prev] += 1
e@0 118
e@0 119 transitions[transitions == 0] = 1
e@0 120
e@0 121
e@0 122 for i in range(0, A.shape[0]):
e@0 123 A[i,:] = A[i,:]/transitions[i]
e@0 124
e@0 125 A += flr
e@0 126
e@0 127
e@0 128
e@0 129 self.transmat = A
e@0 130
e@0 131 for i in range(0, A.shape[0]):
e@0 132 if sum(A[i,:]) > 0:
e@0 133 A[i,:] = A[i,:]/sum(A[i,:])
e@0 134
e@0 135
e@0 136
e@0 137 # print observations
e@0 138
e@0 139
e@0 140 def estimate_emission_probability(self, x, q):
e@0 141 post = self.svm.estimate_posterior_probability_vector(x)
e@0 142 # print "post: ", post
e@0 143 prior = self.prior
e@0 144 # print "prior: ", prior
e@0 145 p = post/prior
e@0 146 p = p/sum(p)
e@0 147
e@0 148 return p[q]
e@0 149
e@0 150 # def estimate_emission_probability(self, x, q):
e@0 151 # return self.svm.estimate_emission_probability(q, x)
e@0 152
e@0 153
e@0 154 def predict(self, X):
e@0 155 return self.logviterbi(X)[0]
e@0 156
e@0 157
e@0 158 def logviterbi(self, X):
e@0 159 # Number of states
e@0 160
e@0 161 N = self.n_classes
e@0 162
e@0 163 # Number of observations
e@0 164
e@0 165
e@0 166
e@0 167 T = X.shape[0]
e@0 168
e@0 169
e@0 170
e@0 171 transmat = self.transmat
e@0 172
e@0 173 #1. Initalization
e@0 174
e@0 175 a1 = self.prior
e@0 176
e@0 177 delta = zeros((T,N))
e@0 178 psi = zeros((T,N))
e@0 179
e@0 180 for i in range(0, N):
e@0 181 delta[0, i] = log(self.transmat[0,i]) + log(self.estimate_emission_probability(X[0,:],i))
e@0 182
e@0 183
e@0 184 #2. Recursion
e@0 185
e@0 186 for t in range(1, T):
e@0 187 # delta_old = delta.copy()
e@0 188 x = X[t, :]
e@0 189 for j in range(0, N):
e@0 190 # print "j: %d, S" % j, delta_old+log(transmat[:,j])
e@0 191 delta[t,j] = max(delta[t-1,:] + log(transmat[:,j])) + log(self.estimate_emission_probability(x,j))
e@0 192 psi[t,j] = argmax(delta[t-1,:] + log(transmat[:,j]))
e@0 193
e@0 194 # print "t: %d, delta: " % t, delta
e@0 195 # print "t: %d, psi:" % t, psi
e@0 196
e@0 197
e@0 198 # 3. Termination
e@0 199
e@0 200 p_star = max(delta[T-1,:] + log(transmat[:,N-1]))
e@0 201 q_star = argmax(delta[T-1,:] + log(transmat[:, N-1]))
e@0 202
e@0 203
e@0 204 # 4. Backtracking
e@0 205
e@0 206 q = zeros((T,))
e@0 207 p = zeros((T,))
e@0 208
e@0 209 q[-1] = q_star
e@0 210 p[-1] = p_star
e@0 211 for t in range(1, T):
e@0 212 q[-t-1] = psi[-t, q[-t]]
e@0 213 p[-t-1] = delta[-t, q[-t]]
e@0 214
e@0 215
e@0 216 return q,p
e@0 217
e@0 218 def viterbi(self, X):
e@0 219 # Number of states
e@0 220
e@0 221 N = self.n_classes
e@0 222
e@0 223 # Number of observations
e@0 224
e@0 225 T = X.shape[0]
e@0 226
e@0 227 transmat = self.transmat
e@0 228
e@0 229 #1. Initialization
e@0 230
e@0 231 a1 = self.prior
e@0 232
e@0 233 delta = zeros((N, ))
e@0 234 psi = zeros((N, ))
e@0 235
e@0 236 for i in range(0, N):
e@0 237 delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i)
e@0 238
e@0 239
e@0 240
e@0 241
e@0 242
e@0 243 #2. Recursion
e@0 244
e@0 245 for t in range(1, T):
e@0 246 delta_old = delta.copy()
e@0 247 x = X[t,:]
e@0 248 for j in range(0, N):
e@0 249 delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j)
e@0 250 psi[j] = argmax(delta_old*transmat[:,j])
e@0 251
e@0 252 #3. Termination
e@0 253
e@0 254 p_star = max(delta*transmat[:,N-1])
e@0 255 q_star = argmax(delta*transmat[:,N-1])
e@0 256
e@0 257
e@0 258
e@0 259 #4. Backtracking
e@0 260
e@0 261 q = zeros((T,))
e@0 262 q[-1] = q_star
e@0 263
e@0 264 for t in range(1, T):
e@0 265 q[-t-1] = psi[q[-t]]
e@0 266
e@0 267 return q
e@0 268
e@0 269
e@0 270 def _log_likelihood_matrix(self, X):
e@0 271 N = self.n_classes
e@0 272 ll = zeros((X.shape[0], N))
e@0 273
e@0 274 for i in range(0, X.shape[0]):
e@0 275 ll[i,:] = self._forward(i, X)
e@0 276
e@0 277 return ll
e@0 278
e@0 279 def compute_ksus(self, X):
e@0 280 N = self.n_classes
e@0 281 T = X.shape[0]
e@0 282 print "[II] Computing gammas... "
e@0 283
e@0 284 gammas = self._forward_backward(X)
e@0 285
e@0 286 # Initialize
e@0 287
e@0 288 aij = self.transmat
e@0 289
e@0 290
e@0 291
e@0 292
e@0 293
e@0 294
e@0 295 def _not_quite_ksu(self, t, i, j, X):
e@0 296 alpha = exp(self.forward(X[0:t+1,:]))[i]
e@0 297 beta = exp(self.backward(X[t+1:,:]))[j]
e@0 298
e@0 299 aij = self.transmat[i,j]
e@0 300 bj = self.estimate_emission_probability(X[t+1,:], j)
e@0 301
e@0 302 return alpha*beta*aij*bj
e@0 303
e@0 304 def _ksu(self, t, i, j, X):
e@0 305 alphaT = exp(self.forward(X[0:t+1,:]))[0]
e@0 306
e@0 307 return self._not_quite_ksu(t,i,j,X)
e@0 308
e@0 309
e@0 310 def _forward_backward(self, X):
e@0 311 T = X.shape[0]
e@0 312 N = self.n_classes
e@0 313 fv = zeros((T, N))
e@0 314 sv = zeros((T, N))
e@0 315
e@0 316 b = None
e@0 317 for t in range(1, T+1):
e@0 318
e@0 319 fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ])
e@0 320
e@0 321 for t in range(1, T+1):
e@0 322 b = self._backward_message(b, X[-t: , :])
e@0 323 sv[-t,:] = lognorm(fv[t-1,:]*b)
e@0 324
e@0 325 return sv
e@0 326
e@0 327
e@0 328 def _backward(self, t, X):
e@0 329 N = self.n_classes
e@0 330 a = ones((N,))/N
e@0 331
e@0 332 beta = zeros((N, ))
e@0 333 transmat = self.transmat
e@0 334 T = X.shape[0]
e@0 335 x = X[t, :]
e@0 336 if t == T-1:
e@0 337 beta = log(a)
e@0 338
e@0 339 return beta
e@0 340 else:
e@0 341 tosum = zeros((N, ))
e@0 342 bt = self._backward(t+1, X)
e@0 343 btnew = zeros((N, ))
e@0 344 # print bt
e@0 345 for j in range(0, N):
e@0 346 for i in range(0, N):
e@0 347 # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j])
e@0 348 tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j])
e@0 349 # print tosum
e@0 350
e@0 351 btnew[j] = logsumexp(tosum)
e@0 352 btnew = lognorm(btnew)
e@0 353 return btnew
e@0 354
e@0 355
e@0 356 def score(self, X):
e@0 357 return self.forward(X)
e@0 358
e@0 359 def forward(self, X):
e@0 360 T = X.shape[0]
e@0 361 f = None
e@0 362 for t in range(1, T+1):
e@0 363 f = self._forward_message(f, X[0:t, :])
e@0 364
e@0 365 return f
e@0 366
e@0 367 def backward(self, X):
e@0 368 T = X.shape[0]
e@0 369 b = None
e@0 370 for t in range(1,T+1):
e@0 371 # print t
e@0 372 # print t,b
e@0 373 idx = arange(-t,T)
e@0 374 # print idx
e@0 375 b = self._backward_message(b, X[-t:, :])
e@0 376
e@0 377 return b
e@0 378
e@0 379 def _backward_message(self, b, X):
e@0 380 N = self.n_classes
e@0 381
e@0 382
e@0 383 beta = zeros((N, ))
e@0 384 transmat = self.transmat
e@0 385
e@0 386 x = X[0, :]
e@0 387
e@0 388 if X.shape[0] == 1:
e@0 389 beta = log(ones((N,))/N)
e@0 390 return beta
e@0 391 else:
e@0 392 tosum = zeros((N, ))
e@0 393 btnew = zeros((N, ))
e@0 394 for j in range(0, N):
e@0 395 for i in range(0, N):
e@0 396 tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j])
e@0 397
e@0 398 btnew[j] = logsumexp(tosum)
e@0 399 # btnew = lognorm(btnew)
e@0 400 return btnew
e@0 401
e@0 402 def _forward_message(self, f, X):
e@0 403 N = self.n_classes
e@0 404 a = self.prior
e@0 405
e@0 406 alpha = zeros((N, ))
e@0 407 transmat = self.transmat
e@0 408
e@0 409 x = X[-1, :]
e@0 410
e@0 411 if X.shape[0] == 1:
e@0 412 for i in range(0, N):
e@0 413 alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i))
e@0 414 # alpha = lognorm(alpha)
e@0 415 return alpha
e@0 416
e@0 417 else:
e@0 418 tosum = zeros((N,))
e@0 419 ftnew = zeros((N,))
e@0 420 for j in range(0, N):
e@0 421 for i in range(0, N):
e@0 422 tosum[i] = f[i] + log(transmat[i,j])
e@0 423 Sum = logsumexp(tosum)
e@0 424 bj = self.estimate_emission_probability(x, j)
e@0 425 ftnew[j] = Sum+log(bj)
e@0 426
e@0 427 # ftnew = lognorm(ftnew)
e@0 428 return ftnew
e@0 429
e@0 430 def _forward(self, t, X):
e@0 431 N = self.n_classes
e@0 432 a = self.prior
e@0 433 # print a
e@0 434 alpha = zeros((N, ))
e@0 435 transmat = self.transmat
e@0 436 x = X[t, :]
e@0 437
e@0 438 if t == 0:
e@0 439 for i in range(0, N):
e@0 440 alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i))
e@0 441 # print "--"
e@0 442 # print alpha
e@0 443 alpha = lognorm(alpha)
e@0 444 # print alpha
e@0 445 # print "xx"
e@0 446 return alpha
e@0 447 else:
e@0 448 tosum = zeros((N, ))
e@0 449 ft = self._forward(t-1, X)
e@0 450 ftnew = zeros((N, ))
e@0 451 for j in range(0, N):
e@0 452 for i in range(0, N):
e@0 453 # print ft
e@0 454 tosum[i] = ft[i] + log(transmat[i,j])
e@0 455
e@0 456 Sum = logsumexp(tosum)
e@0 457 bj = self.estimate_emission_probability(x, j)
e@0 458 ftnew[j] = Sum+log(bj)
e@0 459 ftnew = lognorm(ftnew)
e@0 460
e@0 461 return ftnew