Mercurial > hg > chourdakisreiss2016
diff experiment-reverb/code/models.py @ 0:246d5546657c
initial commit, needs cleanup
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Wed, 14 Dec 2016 13:15:48 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/experiment-reverb/code/models.py Wed Dec 14 13:15:48 2016 +0000 @@ -0,0 +1,396 @@ +# -*- coding: utf-8 -*- +""" +Created on Tue Jul 7 11:21:52 2015 + +@author: Emmanouil Theofanis Chourdakis +""" + +from sklearn.base import BaseEstimator + +import pickle +from numpy import * + + +class SinkHoleClassifier(BaseEstimator): + def __init__(self,name="SinkholeClassifier", N=2, n_components=1): + self.classifierNB = NBClassifier() + # self.classifierSVM = MyAVAClassifier() + self.classifierSVM = LinearSVC(dual=False) + self.classifierHMM = HmmClassifier(N=N, n_components=n_components) + self.classifierHMMSVM = HMMsvmClassifier(N=N) + self.name = name + + def score(self, features, parameters): + predicted_states = self.predict(features) + return accuracy_score(parameters, predicted_states) + + def getName(self): + return self.name + + def get_params(self, deep=True): + return {"N":self.chain_size} + + def set_params(self, **parameters): + for parameter, value in parameters.items(): + self.setattr(parameter, value) + return self + + def fit(self, X, y): + self.n_classes = max(unique(y))+1 + + self.classifierNB.fit(X,y) + self.classifierSVM.fit(X,y) + self.classifierHMM.fit(X,y) + self.classifierHMMSVM.fit(X,y) + + + def predict(self, X): + predictedNB = self.classifierNB.predict(X) + predictedSVM = self.classifierSVM.predict(X) + predictedHMM = self.classifierHMM.predict(X) + predictedHMMSVM = self.classifierHMMSVM.predict(X) + + + + + predicted = zeros((X.shape[0], )) + + for i in range(0, len(predicted)): + candidates = [predictedNB[i], predictedSVM[i], predictedHMM[i], predictedHMMSVM[i], predictedLogReg[i]] + + c = Counter(candidates) + + most_common = c.most_common() + + # If there is an equal voting, select something at random + if len(unique([k[1] for k in most_common])) == 1: + predicted[i] = most_common[randint(len(most_common))][0] + else: + predicted[i] = most_common[0][0] + + return predicted +class MyAVAClassifier: + + def __init__(self): + self.classifiers = {} + self.name = "Linear SVM Classifier" + self.smallname = "svc-ava" + + + def getName(self): + return self.name + def fit(self, X, y, flr = 0, C=0.7): + + n_classes = max(unique(y)) + 1 + + if len(unique(y)) == 1: + self.only_one_class = True + self.n_classes = 1 + self.one_class_label = y[0] + return + elif len(unique(y)) == 2: + + self.n_classes = n_classes + self.svm = svm.SVC(decision_function_shape='ovr',degree=2,probability = True, kernel='poly', gamma=2, C = C) + self.svm.fit(X,y) + classes_ = unique(y) + self.classifiers[(classes_[0], classes_[1])] = self.svm + self.only_two_classes = True + self.only_one_class = False + + return + else: + self.only_two_classes = False + self.only_one_class = False + + + classes = arange(0, n_classes) + self.n_classes = n_classes + + h = histogram(y, n_classes)[0].astype(float) + self.prior = h/sum(h) + + transmat = zeros((n_classes, n_classes)) + + for i in range(1, len(y)): + prev = y[i-1] + cur = y[i] + transmat[prev,cur] += 1 + + transmat = transmat/sum(transmat) + + self.transmat = transmat + + # Add a very small probability for random jump to avoid zero values + + self.transmat += flr + self.transmat = self.transmat/sum(self.transmat) + + for i in range(0, n_classes): + for j in range(0, n_classes): + if i != j and (i,j) not in self.classifiers and (j,i) not in self.classifiers: + + idx_ = bitwise_or(y == classes[i], y == classes[j]) + + X_ = X[idx_, :] + + y_ = y[idx_] + + if len(unique(y_)) > 1: + svm_ = svm.SVC(probability = True, kernel='poly', gamma=2, C = C) + + svm_.fit(X_, y_) + self.classifiers[(i,j)] = svm_ + + + def estimate_pairwise_class_probability(self, i, j, x): + + + if (i,j) not in self.classifiers and (j,i) in self.classifiers: + return self.classifiers[(j,i)].predict_proba(x)[0,1] + elif (i,j) not in self.classifiers and (j,i) not in self.classifiers: + return 0.0 + else: + return self.classifiers[(i,j)].predict_proba(x)[0,0] + + def estimate_posterior_probability(self, i, x): + mus = zeros((self.n_classes,)) + for j in range(0, self.n_classes): + if i != j: + pcp = self.estimate_pairwise_class_probability(i,j,x) + pcp += 1e-18 + mus[j] = 1/pcp + S = sum(mus) - (self.n_classes - 2) + return 1/S + + def estimate_posterior_probability_vector(self, x): + posterior = zeros((self.n_classes,)) + for i in range(0, len(posterior)): + posterior[i] = self.estimate_posterior_probability(i, x) + + return posterior + + + def predict(self, X): + predicted = zeros((X.shape[0],)) + + if self.only_one_class == True: + return ones((X.shape[0],))*self.one_class_label + elif self.only_two_classes == True: + return self.svm.predict(X) + + + for i in range(0, X.shape[0]): + x = X[i,:] + P = zeros((self.n_classes,)) + + + for c in range(0, len(P)): + P[c] = self.estimate_posterior_probability(c, x) + + pred = argmax(P) + predicted[i] = pred + + return predicted + + +class NBClassifier: + def __init__(self): + print "[II] Gaussian Naive Bayes Classifier" + self.name = "Naive Bayes" + self.smallname = "gnbc" + self.gnb = GaussianNB() + + def getName(self): + return self.name + + def score(self, features, parameters): + predicted_states = self.predict(features) + return accuracy_score(parameters, predicted_states) + + def fit(self, X, states): + self.gnb.fit(X, states) + + def predict(self, X): + return self.gnb.predict(X) + + +class HmmClassifier(BaseEstimator): + def __init__(self, N=2,n_components = 1): + self.name = "HMM (%d time steps, %d components)" % (N, n_components) + self.n_components = n_components + self.chain_size = N + + + def get_params(self, deep=True): + return {"N":self.chain_size, "n_components":self.n_components} + + def set_params(self, **parameters): + for parameter, value in parameters.items(): + self.setattr(parameter, value) + return self + + def getName(self): + return self.name + + def score(self, features, parameters): + predicted_states = self.predict(features) + return accuracy_score(parameters, predicted_states) + + def fit(self, features, parameters): + + n_classes = max(unique(parameters)) + 1 + + if n_classes == 1: + self.only_one_class = True + return + else: + self.only_one_class = False + + hmms = [None]*n_classes + + chain_size = self.chain_size + obs = [None]*n_classes + + for i in range(chain_size, len(parameters)): + class_ = parameters[i] + seq = features[i-chain_size:i,:] + + + if obs[class_] is None: + obs[class_] = [seq] + else: + obs[class_].append(seq) + + + + for i in range(0, len(obs)): + + if obs[i] is not None and len(obs[i]) != 0: + hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='diag') + obs_ = concatenate(obs[i]) + hmm_.fit(obs_, [self.chain_size]*len(obs[i])) + + hmms[i] = hmm_ + + self.hmms = hmms + + return obs + + def predict(self, features, mfilt=20): + + if self.only_one_class == True: + return zeros((features.shape[0], )) + + chain_size = self.chain_size + hmms = self.hmms + predicted_classes = zeros((features.shape[0],)) + + + for i in range(chain_size, features.shape[0]): + scores = zeros((len(hmms),)) + + seq = features[i-chain_size:i, :] + + for j in range(0, len(hmms)): + if hmms[j] is not None: + scores[j] = hmms[j].score(seq) + else: + scores[j] = -infty + + predicted_classes[i] = argmax(scores) + + + return predicted_classes + +class HMMsvmClassifier(BaseEstimator): + def __init__(self, N=2): + self.classifiers = {} + self.name = "HMM-SVM Classifier" + self.obs = MyAVAClassifier() + self.chain_size = N + + def get_params(self, deep=True): + return {"N":self.chain_size} + + def set_params(self, **parameters): + for parameter, value in parameters.items(): + self.setattr(parameter, value) + return self + + def getName(self): + return self.name + + def score(self, features, parameters): + predicted_states = self.predict(features) + return accuracy_score(parameters, predicted_states) + + def fit(self, X, y): + self.n_classes = max(unique(y))+1 + + self.obs.fit(X,y) + self.hmm = HMMsvm(self.obs) + self.hmm.fit([X],[y]) + + def predict(self, X): + return self.hmm.predict(X) + + def confidence(self, x, q): + return self.hmm.estimate_emission_probability(x, q) +class ReverbModel: + """ Our Reverberation model, consists on the Principal Components Kernel, + the number of salient principal component dimensions, the list of salient + features and the classifier itself. """ + + + def __init__(self, name=None, kernel=None, q=None, feature_list=None, classifier=None, parameter_dictionary=None, moments_vector=None, filename=None): + + if filename is not None: + self.load(filename) + name = self.name + kernel = self.kernel + q = self.q + feature_list = self.feature_list + classifier = self.classifier + parameter_dictionary = self.parameter_dictionary + moments_vector = self.moments_vector + else: + if parameter_dictionary is None or name is None or kernel is None or q is None or feature_list is None or classifier is None: + raise Exception("Must supply name, kernel, q, feature_list and classifier or filename.") + + + print "[II] Initializing model `%s' with %dx%d PC kernel and features:" % (name,q,q) + print str(feature_list).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] + # print "[II] Using classifier: %s" % classifier.name + print "[II] And parameters dictionary:", parameter_dictionary + + + self.name = name + + self.kernel = kernel + self.q = q + self.feature_list = feature_list + self.classifier = classifier + self.parameter_dictionary = parameter_dictionary + self.moments_vector = moments_vector + + def save(self, filename): + print "[II] Saving model to: `%s.'" % filename + f = open(filename, 'wb') + pickle.dump(self, f) + f.close() + + def load(self, filename): + print "[II] Loading model from: `%s'." % filename + f = open(filename, 'rb') + new_model = pickle.load(f) + self.name = new_model.name + self.kernel = new_model.kernel + self.q = new_model.q + self.feature_list = new_model.feature_list + self.classifier = new_model.classifier + self.parameter_dictionary = new_model.parameter_dictionary + self.moments_vector = new_model.moments_vector + + f.close() + \ No newline at end of file