view experiment-reverb/code/models.py @ 2:c87a9505f294 tip

Added LICENSE for code, removed .wav files
author Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk>
date Sat, 30 Sep 2017 13:25:50 +0100
parents 246d5546657c
children
line wrap: on
line source
# -*- coding: utf-8 -*-
"""
Created on Tue Jul  7 11:21:52 2015

@author: Emmanouil Theofanis Chourdakis
"""

from sklearn.base import BaseEstimator

import pickle
from numpy import *


class SinkHoleClassifier(BaseEstimator):
    def __init__(self,name="SinkholeClassifier", N=2, n_components=1):
        self.classifierNB = NBClassifier()
       # self.classifierSVM = MyAVAClassifier()
        self.classifierSVM = LinearSVC(dual=False)
        self.classifierHMM = HmmClassifier(N=N, n_components=n_components)
        self.classifierHMMSVM = HMMsvmClassifier(N=N)
        self.name = name

    def score(self, features, parameters):
        predicted_states = self.predict(features)
        return accuracy_score(parameters, predicted_states)

    def getName(self):
        return self.name

    def get_params(self, deep=True):
        return {"N":self.chain_size}
        
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            self.setattr(parameter, value)
        return self
        
    def fit(self, X, y):
        self.n_classes = max(unique(y))+1

        self.classifierNB.fit(X,y)
        self.classifierSVM.fit(X,y)
        self.classifierHMM.fit(X,y)
        self.classifierHMMSVM.fit(X,y)


    def predict(self, X):
        predictedNB = self.classifierNB.predict(X)
        predictedSVM = self.classifierSVM.predict(X)
        predictedHMM = self.classifierHMM.predict(X)
        predictedHMMSVM = self.classifierHMMSVM.predict(X)
        



        predicted = zeros((X.shape[0], ))

        for i in range(0, len(predicted)):
            candidates = [predictedNB[i], predictedSVM[i], predictedHMM[i], predictedHMMSVM[i], predictedLogReg[i]]
            
            c = Counter(candidates)

            most_common = c.most_common()

            # If there is an equal voting, select something at random
            if len(unique([k[1] for k in most_common])) == 1:
                predicted[i] = most_common[randint(len(most_common))][0]
            else:
                predicted[i] = most_common[0][0]

        return predicted            
class MyAVAClassifier:

    def __init__(self):
        self.classifiers = {}
        self.name  = "Linear SVM Classifier"
        self.smallname = "svc-ava"


    def getName(self):
        return self.name
    def fit(self, X, y, flr = 0, C=0.7):

        n_classes = max(unique(y)) + 1

        if len(unique(y)) == 1:
            self.only_one_class = True
            self.n_classes = 1
            self.one_class_label = y[0]
            return
        elif len(unique(y)) == 2:

            self.n_classes = n_classes
            self.svm = svm.SVC(decision_function_shape='ovr',degree=2,probability = True, kernel='poly', gamma=2, C = C)
            self.svm.fit(X,y)
            classes_ = unique(y)
            self.classifiers[(classes_[0], classes_[1])] = self.svm
            self.only_two_classes = True
            self.only_one_class = False

            return
        else:
            self.only_two_classes = False
            self.only_one_class = False


        classes = arange(0, n_classes)
        self.n_classes = n_classes

        h = histogram(y, n_classes)[0].astype(float)
        self.prior = h/sum(h)

        transmat = zeros((n_classes, n_classes))

        for i in range(1, len(y)):
            prev = y[i-1]
            cur = y[i]
            transmat[prev,cur] += 1

        transmat = transmat/sum(transmat)

        self.transmat = transmat

        # Add a very small probability for random jump to avoid zero values

        self.transmat += flr
        self.transmat = self.transmat/sum(self.transmat)

        for i in range(0, n_classes):
            for j in range(0, n_classes):
                if i != j and (i,j) not in self.classifiers and (j,i) not in self.classifiers:

                    idx_ = bitwise_or(y == classes[i], y == classes[j])

                    X_ = X[idx_, :]

                    y_ = y[idx_]

                    if len(unique(y_)) > 1:
                        svm_ = svm.SVC(probability = True, kernel='poly', gamma=2, C = C)

                        svm_.fit(X_, y_)
                        self.classifiers[(i,j)] = svm_


    def estimate_pairwise_class_probability(self, i, j, x):


        if (i,j) not in self.classifiers and (j,i) in self.classifiers:
            return  self.classifiers[(j,i)].predict_proba(x)[0,1]
        elif (i,j) not in self.classifiers and (j,i) not in self.classifiers:
            return 0.0
        else:
            return self.classifiers[(i,j)].predict_proba(x)[0,0]

    def estimate_posterior_probability(self, i, x):
        mus = zeros((self.n_classes,))
        for j in range(0, self.n_classes):
            if i != j:
                pcp = self.estimate_pairwise_class_probability(i,j,x)
                pcp += 1e-18
                mus[j] = 1/pcp
        S = sum(mus) - (self.n_classes - 2)
        return 1/S

    def estimate_posterior_probability_vector(self, x):
        posterior = zeros((self.n_classes,))
        for i in range(0, len(posterior)):
            posterior[i] = self.estimate_posterior_probability(i, x)

        return posterior


    def predict(self, X):
        predicted = zeros((X.shape[0],))

        if self.only_one_class == True:
            return ones((X.shape[0],))*self.one_class_label
        elif self.only_two_classes == True:
            return self.svm.predict(X)


        for i in range(0, X.shape[0]):
            x = X[i,:]
            P = zeros((self.n_classes,))


            for c in range(0, len(P)):
                P[c] =  self.estimate_posterior_probability(c, x)

            pred = argmax(P)
            predicted[i] = pred

        return predicted
            

class NBClassifier:
    def __init__(self):
        print "[II] Gaussian Naive Bayes Classifier"
        self.name = "Naive Bayes"
        self.smallname = "gnbc"
        self.gnb = GaussianNB()

    def getName(self):
        return self.name

    def score(self, features, parameters):
        predicted_states = self.predict(features)
        return accuracy_score(parameters, predicted_states)

    def fit(self, X, states):
        self.gnb.fit(X, states)

    def predict(self, X):
        return self.gnb.predict(X)
        

class HmmClassifier(BaseEstimator):
    def __init__(self, N=2,n_components = 1):
        self.name = "HMM (%d time steps, %d components)" % (N, n_components)
        self.n_components = n_components
        self.chain_size = N
   

    def get_params(self, deep=True):
        return {"N":self.chain_size, "n_components":self.n_components}
        
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            self.setattr(parameter, value)
        return self

    def getName(self):
        return self.name

    def score(self, features, parameters):
        predicted_states = self.predict(features)
        return accuracy_score(parameters, predicted_states)
        
    def fit(self, features, parameters):

        n_classes = max(unique(parameters)) + 1

        if n_classes == 1:
            self.only_one_class = True
            return
        else:
            self.only_one_class = False

        hmms = [None]*n_classes
 
        chain_size = self.chain_size
        obs = [None]*n_classes

        for i in range(chain_size, len(parameters)):
            class_ = parameters[i]
            seq = features[i-chain_size:i,:]


            if obs[class_] is None:
                obs[class_] = [seq]
            else:
                obs[class_].append(seq)



        for i in range(0, len(obs)):
       
            if obs[i] is not None and len(obs[i]) != 0:
                hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='diag')
                obs_ = concatenate(obs[i])
                hmm_.fit(obs_, [self.chain_size]*len(obs[i]))

                hmms[i] = hmm_

        self.hmms = hmms

        return obs

    def predict(self, features, mfilt=20):

        if self.only_one_class == True:
            return zeros((features.shape[0], ))

        chain_size = self.chain_size
        hmms = self.hmms
        predicted_classes = zeros((features.shape[0],))


        for i in range(chain_size, features.shape[0]):
            scores = zeros((len(hmms),))

            seq = features[i-chain_size:i, :]

            for j in range(0, len(hmms)):
                if hmms[j] is not None:
                    scores[j] = hmms[j].score(seq)
                else:
                    scores[j] = -infty

            predicted_classes[i] = argmax(scores)


        return predicted_classes
        
class HMMsvmClassifier(BaseEstimator):
    def __init__(self, N=2):
        self.classifiers = {}
        self.name = "HMM-SVM Classifier"
        self.obs = MyAVAClassifier()
        self.chain_size = N

    def get_params(self, deep=True):
        return {"N":self.chain_size}
        
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            self.setattr(parameter, value)
        return self
        
    def getName(self):
        return self.name

    def score(self, features, parameters):
        predicted_states = self.predict(features)
        return accuracy_score(parameters, predicted_states)
        
    def fit(self, X, y):
        self.n_classes = max(unique(y))+1

        self.obs.fit(X,y)
        self.hmm = HMMsvm(self.obs)
        self.hmm.fit([X],[y])

    def predict(self, X):
        return self.hmm.predict(X)

    def confidence(self, x, q):
        return self.hmm.estimate_emission_probability(x, q)        
class ReverbModel:
    """ Our Reverberation model, consists on the Principal Components Kernel,
    the number of salient principal component dimensions, the list of salient 
    features and the classifier itself. """
    
    
    def __init__(self, name=None, kernel=None, q=None, feature_list=None, classifier=None, parameter_dictionary=None, moments_vector=None, filename=None):
        
        if filename is not None:
            self.load(filename)
            name = self.name
            kernel = self.kernel
            q = self.q
            feature_list = self.feature_list
            classifier = self.classifier
            parameter_dictionary = self.parameter_dictionary
            moments_vector = self.moments_vector
        else:
            if parameter_dictionary is None or name is None or kernel is None or q is None or feature_list is None or classifier is None:
                raise Exception("Must supply name, kernel, q, feature_list and classifier or filename.")
                
                
        print "[II] Initializing model `%s' with %dx%d PC kernel and features:" % (name,q,q)
        print  str(feature_list).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7]
      #  print "[II] Using classifier: %s" % classifier.name
        print "[II] And parameters dictionary:", parameter_dictionary
        

        self.name = name
        
        self.kernel = kernel
        self.q = q
        self.feature_list = feature_list
        self.classifier = classifier 
        self.parameter_dictionary = parameter_dictionary
        self.moments_vector = moments_vector
    
    def save(self, filename):
        print "[II] Saving model to: `%s.'" % filename
        f = open(filename, 'wb')
        pickle.dump(self, f)
        f.close()
    
    def load(self, filename):
        print "[II] Loading model from: `%s'." % filename
        f = open(filename, 'rb')
        new_model = pickle.load(f)
        self.name = new_model.name
        self.kernel = new_model.kernel
        self.q = new_model.q
        self.feature_list = new_model.feature_list
        self.classifier = new_model.classifier
        self.parameter_dictionary = new_model.parameter_dictionary
        self.moments_vector = new_model.moments_vector
        
        f.close()