e@0: # -*- coding: utf-8 -*- e@0: """ e@0: Created on Tue Jul 7 11:21:52 2015 e@0: e@0: @author: Emmanouil Theofanis Chourdakis e@0: """ e@0: e@0: from sklearn.base import BaseEstimator e@0: e@0: import pickle e@0: from numpy import * e@0: e@0: e@0: class SinkHoleClassifier(BaseEstimator): e@0: def __init__(self,name="SinkholeClassifier", N=2, n_components=1): e@0: self.classifierNB = NBClassifier() e@0: # self.classifierSVM = MyAVAClassifier() e@0: self.classifierSVM = LinearSVC(dual=False) e@0: self.classifierHMM = HmmClassifier(N=N, n_components=n_components) e@0: self.classifierHMMSVM = HMMsvmClassifier(N=N) e@0: self.name = name e@0: e@0: def score(self, features, parameters): e@0: predicted_states = self.predict(features) e@0: return accuracy_score(parameters, predicted_states) e@0: e@0: def getName(self): e@0: return self.name e@0: e@0: def get_params(self, deep=True): e@0: return {"N":self.chain_size} e@0: e@0: def set_params(self, **parameters): e@0: for parameter, value in parameters.items(): e@0: self.setattr(parameter, value) e@0: return self e@0: e@0: def fit(self, X, y): e@0: self.n_classes = max(unique(y))+1 e@0: e@0: self.classifierNB.fit(X,y) e@0: self.classifierSVM.fit(X,y) e@0: self.classifierHMM.fit(X,y) e@0: self.classifierHMMSVM.fit(X,y) e@0: e@0: e@0: def predict(self, X): e@0: predictedNB = self.classifierNB.predict(X) e@0: predictedSVM = self.classifierSVM.predict(X) e@0: predictedHMM = self.classifierHMM.predict(X) e@0: predictedHMMSVM = self.classifierHMMSVM.predict(X) e@0: e@0: e@0: e@0: e@0: predicted = zeros((X.shape[0], )) e@0: e@0: for i in range(0, len(predicted)): e@0: candidates = [predictedNB[i], predictedSVM[i], predictedHMM[i], predictedHMMSVM[i], predictedLogReg[i]] e@0: e@0: c = Counter(candidates) e@0: e@0: most_common = c.most_common() e@0: e@0: # If there is an equal voting, select something at random e@0: if len(unique([k[1] for k in most_common])) == 1: e@0: predicted[i] = most_common[randint(len(most_common))][0] e@0: else: e@0: predicted[i] = most_common[0][0] e@0: e@0: return predicted e@0: class MyAVAClassifier: e@0: e@0: def __init__(self): e@0: self.classifiers = {} e@0: self.name = "Linear SVM Classifier" e@0: self.smallname = "svc-ava" e@0: e@0: e@0: def getName(self): e@0: return self.name e@0: def fit(self, X, y, flr = 0, C=0.7): e@0: e@0: n_classes = max(unique(y)) + 1 e@0: e@0: if len(unique(y)) == 1: e@0: self.only_one_class = True e@0: self.n_classes = 1 e@0: self.one_class_label = y[0] e@0: return e@0: elif len(unique(y)) == 2: e@0: e@0: self.n_classes = n_classes e@0: self.svm = svm.SVC(decision_function_shape='ovr',degree=2,probability = True, kernel='poly', gamma=2, C = C) e@0: self.svm.fit(X,y) e@0: classes_ = unique(y) e@0: self.classifiers[(classes_[0], classes_[1])] = self.svm e@0: self.only_two_classes = True e@0: self.only_one_class = False e@0: e@0: return e@0: else: e@0: self.only_two_classes = False e@0: self.only_one_class = False e@0: e@0: e@0: classes = arange(0, n_classes) e@0: self.n_classes = n_classes e@0: e@0: h = histogram(y, n_classes)[0].astype(float) e@0: self.prior = h/sum(h) e@0: e@0: transmat = zeros((n_classes, n_classes)) e@0: e@0: for i in range(1, len(y)): e@0: prev = y[i-1] e@0: cur = y[i] e@0: transmat[prev,cur] += 1 e@0: e@0: transmat = transmat/sum(transmat) e@0: e@0: self.transmat = transmat e@0: e@0: # Add a very small probability for random jump to avoid zero values e@0: e@0: self.transmat += flr e@0: self.transmat = self.transmat/sum(self.transmat) e@0: e@0: for i in range(0, n_classes): e@0: for j in range(0, n_classes): e@0: if i != j and (i,j) not in self.classifiers and (j,i) not in self.classifiers: e@0: e@0: idx_ = bitwise_or(y == classes[i], y == classes[j]) e@0: e@0: X_ = X[idx_, :] e@0: e@0: y_ = y[idx_] e@0: e@0: if len(unique(y_)) > 1: e@0: svm_ = svm.SVC(probability = True, kernel='poly', gamma=2, C = C) e@0: e@0: svm_.fit(X_, y_) e@0: self.classifiers[(i,j)] = svm_ e@0: e@0: e@0: def estimate_pairwise_class_probability(self, i, j, x): e@0: e@0: e@0: if (i,j) not in self.classifiers and (j,i) in self.classifiers: e@0: return self.classifiers[(j,i)].predict_proba(x)[0,1] e@0: elif (i,j) not in self.classifiers and (j,i) not in self.classifiers: e@0: return 0.0 e@0: else: e@0: return self.classifiers[(i,j)].predict_proba(x)[0,0] e@0: e@0: def estimate_posterior_probability(self, i, x): e@0: mus = zeros((self.n_classes,)) e@0: for j in range(0, self.n_classes): e@0: if i != j: e@0: pcp = self.estimate_pairwise_class_probability(i,j,x) e@0: pcp += 1e-18 e@0: mus[j] = 1/pcp e@0: S = sum(mus) - (self.n_classes - 2) e@0: return 1/S e@0: e@0: def estimate_posterior_probability_vector(self, x): e@0: posterior = zeros((self.n_classes,)) e@0: for i in range(0, len(posterior)): e@0: posterior[i] = self.estimate_posterior_probability(i, x) e@0: e@0: return posterior e@0: e@0: e@0: def predict(self, X): e@0: predicted = zeros((X.shape[0],)) e@0: e@0: if self.only_one_class == True: e@0: return ones((X.shape[0],))*self.one_class_label e@0: elif self.only_two_classes == True: e@0: return self.svm.predict(X) e@0: e@0: e@0: for i in range(0, X.shape[0]): e@0: x = X[i,:] e@0: P = zeros((self.n_classes,)) e@0: e@0: e@0: for c in range(0, len(P)): e@0: P[c] = self.estimate_posterior_probability(c, x) e@0: e@0: pred = argmax(P) e@0: predicted[i] = pred e@0: e@0: return predicted e@0: e@0: e@0: class NBClassifier: e@0: def __init__(self): e@0: print "[II] Gaussian Naive Bayes Classifier" e@0: self.name = "Naive Bayes" e@0: self.smallname = "gnbc" e@0: self.gnb = GaussianNB() e@0: e@0: def getName(self): e@0: return self.name e@0: e@0: def score(self, features, parameters): e@0: predicted_states = self.predict(features) e@0: return accuracy_score(parameters, predicted_states) e@0: e@0: def fit(self, X, states): e@0: self.gnb.fit(X, states) e@0: e@0: def predict(self, X): e@0: return self.gnb.predict(X) e@0: e@0: e@0: class HmmClassifier(BaseEstimator): e@0: def __init__(self, N=2,n_components = 1): e@0: self.name = "HMM (%d time steps, %d components)" % (N, n_components) e@0: self.n_components = n_components e@0: self.chain_size = N e@0: e@0: e@0: def get_params(self, deep=True): e@0: return {"N":self.chain_size, "n_components":self.n_components} e@0: e@0: def set_params(self, **parameters): e@0: for parameter, value in parameters.items(): e@0: self.setattr(parameter, value) e@0: return self e@0: e@0: def getName(self): e@0: return self.name e@0: e@0: def score(self, features, parameters): e@0: predicted_states = self.predict(features) e@0: return accuracy_score(parameters, predicted_states) e@0: e@0: def fit(self, features, parameters): e@0: e@0: n_classes = max(unique(parameters)) + 1 e@0: e@0: if n_classes == 1: e@0: self.only_one_class = True e@0: return e@0: else: e@0: self.only_one_class = False e@0: e@0: hmms = [None]*n_classes e@0: e@0: chain_size = self.chain_size e@0: obs = [None]*n_classes e@0: e@0: for i in range(chain_size, len(parameters)): e@0: class_ = parameters[i] e@0: seq = features[i-chain_size:i,:] e@0: e@0: e@0: if obs[class_] is None: e@0: obs[class_] = [seq] e@0: else: e@0: obs[class_].append(seq) e@0: e@0: e@0: e@0: for i in range(0, len(obs)): e@0: e@0: if obs[i] is not None and len(obs[i]) != 0: e@0: hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='diag') e@0: obs_ = concatenate(obs[i]) e@0: hmm_.fit(obs_, [self.chain_size]*len(obs[i])) e@0: e@0: hmms[i] = hmm_ e@0: e@0: self.hmms = hmms e@0: e@0: return obs e@0: e@0: def predict(self, features, mfilt=20): e@0: e@0: if self.only_one_class == True: e@0: return zeros((features.shape[0], )) e@0: e@0: chain_size = self.chain_size e@0: hmms = self.hmms e@0: predicted_classes = zeros((features.shape[0],)) e@0: e@0: e@0: for i in range(chain_size, features.shape[0]): e@0: scores = zeros((len(hmms),)) e@0: e@0: seq = features[i-chain_size:i, :] e@0: e@0: for j in range(0, len(hmms)): e@0: if hmms[j] is not None: e@0: scores[j] = hmms[j].score(seq) e@0: else: e@0: scores[j] = -infty e@0: e@0: predicted_classes[i] = argmax(scores) e@0: e@0: e@0: return predicted_classes e@0: e@0: class HMMsvmClassifier(BaseEstimator): e@0: def __init__(self, N=2): e@0: self.classifiers = {} e@0: self.name = "HMM-SVM Classifier" e@0: self.obs = MyAVAClassifier() e@0: self.chain_size = N e@0: e@0: def get_params(self, deep=True): e@0: return {"N":self.chain_size} e@0: e@0: def set_params(self, **parameters): e@0: for parameter, value in parameters.items(): e@0: self.setattr(parameter, value) e@0: return self e@0: e@0: def getName(self): e@0: return self.name e@0: e@0: def score(self, features, parameters): e@0: predicted_states = self.predict(features) e@0: return accuracy_score(parameters, predicted_states) e@0: e@0: def fit(self, X, y): e@0: self.n_classes = max(unique(y))+1 e@0: e@0: self.obs.fit(X,y) e@0: self.hmm = HMMsvm(self.obs) e@0: self.hmm.fit([X],[y]) e@0: e@0: def predict(self, X): e@0: return self.hmm.predict(X) e@0: e@0: def confidence(self, x, q): e@0: return self.hmm.estimate_emission_probability(x, q) e@0: class ReverbModel: e@0: """ Our Reverberation model, consists on the Principal Components Kernel, e@0: the number of salient principal component dimensions, the list of salient e@0: features and the classifier itself. """ e@0: e@0: e@0: def __init__(self, name=None, kernel=None, q=None, feature_list=None, classifier=None, parameter_dictionary=None, moments_vector=None, filename=None): e@0: e@0: if filename is not None: e@0: self.load(filename) e@0: name = self.name e@0: kernel = self.kernel e@0: q = self.q e@0: feature_list = self.feature_list e@0: classifier = self.classifier e@0: parameter_dictionary = self.parameter_dictionary e@0: moments_vector = self.moments_vector e@0: else: e@0: if parameter_dictionary is None or name is None or kernel is None or q is None or feature_list is None or classifier is None: e@0: raise Exception("Must supply name, kernel, q, feature_list and classifier or filename.") e@0: e@0: e@0: print "[II] Initializing model `%s' with %dx%d PC kernel and features:" % (name,q,q) e@0: print str(feature_list).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] e@0: # print "[II] Using classifier: %s" % classifier.name e@0: print "[II] And parameters dictionary:", parameter_dictionary e@0: e@0: e@0: self.name = name e@0: e@0: self.kernel = kernel e@0: self.q = q e@0: self.feature_list = feature_list e@0: self.classifier = classifier e@0: self.parameter_dictionary = parameter_dictionary e@0: self.moments_vector = moments_vector e@0: e@0: def save(self, filename): e@0: print "[II] Saving model to: `%s.'" % filename e@0: f = open(filename, 'wb') e@0: pickle.dump(self, f) e@0: f.close() e@0: e@0: def load(self, filename): e@0: print "[II] Loading model from: `%s'." % filename e@0: f = open(filename, 'rb') e@0: new_model = pickle.load(f) e@0: self.name = new_model.name e@0: self.kernel = new_model.kernel e@0: self.q = new_model.q e@0: self.feature_list = new_model.feature_list e@0: self.classifier = new_model.classifier e@0: self.parameter_dictionary = new_model.parameter_dictionary e@0: self.moments_vector = new_model.moments_vector e@0: e@0: f.close() e@0: