diff experiment-reverb/code/models.py @ 0:246d5546657c

initial commit, needs cleanup
author Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk>
date Wed, 14 Dec 2016 13:15:48 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/experiment-reverb/code/models.py	Wed Dec 14 13:15:48 2016 +0000
@@ -0,0 +1,396 @@
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Jul  7 11:21:52 2015
+
+@author: Emmanouil Theofanis Chourdakis
+"""
+
+from sklearn.base import BaseEstimator
+
+import pickle
+from numpy import *
+
+
+class SinkHoleClassifier(BaseEstimator):
+    def __init__(self,name="SinkholeClassifier", N=2, n_components=1):
+        self.classifierNB = NBClassifier()
+       # self.classifierSVM = MyAVAClassifier()
+        self.classifierSVM = LinearSVC(dual=False)
+        self.classifierHMM = HmmClassifier(N=N, n_components=n_components)
+        self.classifierHMMSVM = HMMsvmClassifier(N=N)
+        self.name = name
+
+    def score(self, features, parameters):
+        predicted_states = self.predict(features)
+        return accuracy_score(parameters, predicted_states)
+
+    def getName(self):
+        return self.name
+
+    def get_params(self, deep=True):
+        return {"N":self.chain_size}
+        
+    def set_params(self, **parameters):
+        for parameter, value in parameters.items():
+            self.setattr(parameter, value)
+        return self
+        
+    def fit(self, X, y):
+        self.n_classes = max(unique(y))+1
+
+        self.classifierNB.fit(X,y)
+        self.classifierSVM.fit(X,y)
+        self.classifierHMM.fit(X,y)
+        self.classifierHMMSVM.fit(X,y)
+
+
+    def predict(self, X):
+        predictedNB = self.classifierNB.predict(X)
+        predictedSVM = self.classifierSVM.predict(X)
+        predictedHMM = self.classifierHMM.predict(X)
+        predictedHMMSVM = self.classifierHMMSVM.predict(X)
+        
+
+
+
+        predicted = zeros((X.shape[0], ))
+
+        for i in range(0, len(predicted)):
+            candidates = [predictedNB[i], predictedSVM[i], predictedHMM[i], predictedHMMSVM[i], predictedLogReg[i]]
+            
+            c = Counter(candidates)
+
+            most_common = c.most_common()
+
+            # If there is an equal voting, select something at random
+            if len(unique([k[1] for k in most_common])) == 1:
+                predicted[i] = most_common[randint(len(most_common))][0]
+            else:
+                predicted[i] = most_common[0][0]
+
+        return predicted            
+class MyAVAClassifier:
+
+    def __init__(self):
+        self.classifiers = {}
+        self.name  = "Linear SVM Classifier"
+        self.smallname = "svc-ava"
+
+
+    def getName(self):
+        return self.name
+    def fit(self, X, y, flr = 0, C=0.7):
+
+        n_classes = max(unique(y)) + 1
+
+        if len(unique(y)) == 1:
+            self.only_one_class = True
+            self.n_classes = 1
+            self.one_class_label = y[0]
+            return
+        elif len(unique(y)) == 2:
+
+            self.n_classes = n_classes
+            self.svm = svm.SVC(decision_function_shape='ovr',degree=2,probability = True, kernel='poly', gamma=2, C = C)
+            self.svm.fit(X,y)
+            classes_ = unique(y)
+            self.classifiers[(classes_[0], classes_[1])] = self.svm
+            self.only_two_classes = True
+            self.only_one_class = False
+
+            return
+        else:
+            self.only_two_classes = False
+            self.only_one_class = False
+
+
+        classes = arange(0, n_classes)
+        self.n_classes = n_classes
+
+        h = histogram(y, n_classes)[0].astype(float)
+        self.prior = h/sum(h)
+
+        transmat = zeros((n_classes, n_classes))
+
+        for i in range(1, len(y)):
+            prev = y[i-1]
+            cur = y[i]
+            transmat[prev,cur] += 1
+
+        transmat = transmat/sum(transmat)
+
+        self.transmat = transmat
+
+        # Add a very small probability for random jump to avoid zero values
+
+        self.transmat += flr
+        self.transmat = self.transmat/sum(self.transmat)
+
+        for i in range(0, n_classes):
+            for j in range(0, n_classes):
+                if i != j and (i,j) not in self.classifiers and (j,i) not in self.classifiers:
+
+                    idx_ = bitwise_or(y == classes[i], y == classes[j])
+
+                    X_ = X[idx_, :]
+
+                    y_ = y[idx_]
+
+                    if len(unique(y_)) > 1:
+                        svm_ = svm.SVC(probability = True, kernel='poly', gamma=2, C = C)
+
+                        svm_.fit(X_, y_)
+                        self.classifiers[(i,j)] = svm_
+
+
+    def estimate_pairwise_class_probability(self, i, j, x):
+
+
+        if (i,j) not in self.classifiers and (j,i) in self.classifiers:
+            return  self.classifiers[(j,i)].predict_proba(x)[0,1]
+        elif (i,j) not in self.classifiers and (j,i) not in self.classifiers:
+            return 0.0
+        else:
+            return self.classifiers[(i,j)].predict_proba(x)[0,0]
+
+    def estimate_posterior_probability(self, i, x):
+        mus = zeros((self.n_classes,))
+        for j in range(0, self.n_classes):
+            if i != j:
+                pcp = self.estimate_pairwise_class_probability(i,j,x)
+                pcp += 1e-18
+                mus[j] = 1/pcp
+        S = sum(mus) - (self.n_classes - 2)
+        return 1/S
+
+    def estimate_posterior_probability_vector(self, x):
+        posterior = zeros((self.n_classes,))
+        for i in range(0, len(posterior)):
+            posterior[i] = self.estimate_posterior_probability(i, x)
+
+        return posterior
+
+
+    def predict(self, X):
+        predicted = zeros((X.shape[0],))
+
+        if self.only_one_class == True:
+            return ones((X.shape[0],))*self.one_class_label
+        elif self.only_two_classes == True:
+            return self.svm.predict(X)
+
+
+        for i in range(0, X.shape[0]):
+            x = X[i,:]
+            P = zeros((self.n_classes,))
+
+
+            for c in range(0, len(P)):
+                P[c] =  self.estimate_posterior_probability(c, x)
+
+            pred = argmax(P)
+            predicted[i] = pred
+
+        return predicted
+            
+
+class NBClassifier:
+    def __init__(self):
+        print "[II] Gaussian Naive Bayes Classifier"
+        self.name = "Naive Bayes"
+        self.smallname = "gnbc"
+        self.gnb = GaussianNB()
+
+    def getName(self):
+        return self.name
+
+    def score(self, features, parameters):
+        predicted_states = self.predict(features)
+        return accuracy_score(parameters, predicted_states)
+
+    def fit(self, X, states):
+        self.gnb.fit(X, states)
+
+    def predict(self, X):
+        return self.gnb.predict(X)
+        
+
+class HmmClassifier(BaseEstimator):
+    def __init__(self, N=2,n_components = 1):
+        self.name = "HMM (%d time steps, %d components)" % (N, n_components)
+        self.n_components = n_components
+        self.chain_size = N
+   
+
+    def get_params(self, deep=True):
+        return {"N":self.chain_size, "n_components":self.n_components}
+        
+    def set_params(self, **parameters):
+        for parameter, value in parameters.items():
+            self.setattr(parameter, value)
+        return self
+
+    def getName(self):
+        return self.name
+
+    def score(self, features, parameters):
+        predicted_states = self.predict(features)
+        return accuracy_score(parameters, predicted_states)
+        
+    def fit(self, features, parameters):
+
+        n_classes = max(unique(parameters)) + 1
+
+        if n_classes == 1:
+            self.only_one_class = True
+            return
+        else:
+            self.only_one_class = False
+
+        hmms = [None]*n_classes
+ 
+        chain_size = self.chain_size
+        obs = [None]*n_classes
+
+        for i in range(chain_size, len(parameters)):
+            class_ = parameters[i]
+            seq = features[i-chain_size:i,:]
+
+
+            if obs[class_] is None:
+                obs[class_] = [seq]
+            else:
+                obs[class_].append(seq)
+
+
+
+        for i in range(0, len(obs)):
+       
+            if obs[i] is not None and len(obs[i]) != 0:
+                hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='diag')
+                obs_ = concatenate(obs[i])
+                hmm_.fit(obs_, [self.chain_size]*len(obs[i]))
+
+                hmms[i] = hmm_
+
+        self.hmms = hmms
+
+        return obs
+
+    def predict(self, features, mfilt=20):
+
+        if self.only_one_class == True:
+            return zeros((features.shape[0], ))
+
+        chain_size = self.chain_size
+        hmms = self.hmms
+        predicted_classes = zeros((features.shape[0],))
+
+
+        for i in range(chain_size, features.shape[0]):
+            scores = zeros((len(hmms),))
+
+            seq = features[i-chain_size:i, :]
+
+            for j in range(0, len(hmms)):
+                if hmms[j] is not None:
+                    scores[j] = hmms[j].score(seq)
+                else:
+                    scores[j] = -infty
+
+            predicted_classes[i] = argmax(scores)
+
+
+        return predicted_classes
+        
+class HMMsvmClassifier(BaseEstimator):
+    def __init__(self, N=2):
+        self.classifiers = {}
+        self.name = "HMM-SVM Classifier"
+        self.obs = MyAVAClassifier()
+        self.chain_size = N
+
+    def get_params(self, deep=True):
+        return {"N":self.chain_size}
+        
+    def set_params(self, **parameters):
+        for parameter, value in parameters.items():
+            self.setattr(parameter, value)
+        return self
+        
+    def getName(self):
+        return self.name
+
+    def score(self, features, parameters):
+        predicted_states = self.predict(features)
+        return accuracy_score(parameters, predicted_states)
+        
+    def fit(self, X, y):
+        self.n_classes = max(unique(y))+1
+
+        self.obs.fit(X,y)
+        self.hmm = HMMsvm(self.obs)
+        self.hmm.fit([X],[y])
+
+    def predict(self, X):
+        return self.hmm.predict(X)
+
+    def confidence(self, x, q):
+        return self.hmm.estimate_emission_probability(x, q)        
+class ReverbModel:
+    """ Our Reverberation model, consists on the Principal Components Kernel,
+    the number of salient principal component dimensions, the list of salient 
+    features and the classifier itself. """
+    
+    
+    def __init__(self, name=None, kernel=None, q=None, feature_list=None, classifier=None, parameter_dictionary=None, moments_vector=None, filename=None):
+        
+        if filename is not None:
+            self.load(filename)
+            name = self.name
+            kernel = self.kernel
+            q = self.q
+            feature_list = self.feature_list
+            classifier = self.classifier
+            parameter_dictionary = self.parameter_dictionary
+            moments_vector = self.moments_vector
+        else:
+            if parameter_dictionary is None or name is None or kernel is None or q is None or feature_list is None or classifier is None:
+                raise Exception("Must supply name, kernel, q, feature_list and classifier or filename.")
+                
+                
+        print "[II] Initializing model `%s' with %dx%d PC kernel and features:" % (name,q,q)
+        print  str(feature_list).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7]
+      #  print "[II] Using classifier: %s" % classifier.name
+        print "[II] And parameters dictionary:", parameter_dictionary
+        
+
+        self.name = name
+        
+        self.kernel = kernel
+        self.q = q
+        self.feature_list = feature_list
+        self.classifier = classifier 
+        self.parameter_dictionary = parameter_dictionary
+        self.moments_vector = moments_vector
+    
+    def save(self, filename):
+        print "[II] Saving model to: `%s.'" % filename
+        f = open(filename, 'wb')
+        pickle.dump(self, f)
+        f.close()
+    
+    def load(self, filename):
+        print "[II] Loading model from: `%s'." % filename
+        f = open(filename, 'rb')
+        new_model = pickle.load(f)
+        self.name = new_model.name
+        self.kernel = new_model.kernel
+        self.q = new_model.q
+        self.feature_list = new_model.feature_list
+        self.classifier = new_model.classifier
+        self.parameter_dictionary = new_model.parameter_dictionary
+        self.moments_vector = new_model.moments_vector
+        
+        f.close()
+    
\ No newline at end of file