Mercurial > hg > chourdakisreiss2016
view experiment-reverb/code/training_bak.py @ 2:c87a9505f294 tip
Added LICENSE for code, removed .wav files
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Sat, 30 Sep 2017 13:25:50 +0100 |
parents | 246d5546657c |
children |
line wrap: on
line source
#!/usr/bin/python2 # -*- coding: utf-8 -*- """ Created on Thu Apr 23 11:53:17 2015 @author: mmxgn """ # This file does the cluster estimation and the removal of outliers from sys import argv, exit from essentia.standard import YamlInput, YamlOutput from essentia import Pool from pca import * from numpy import * from sklearn import cluster from sklearn.metrics import pairwise_distances from sklearn.cluster import KMeans, MiniBatchKMeans from matplotlib.pyplot import * #from sklearn.mixture import GMM from sklearn.naive_bayes import GaussianNB, MultinomialNB from scipy.signal import decimate from sklearn import cross_validation from sklearn import multiclass #from hmmlearn import hmm from hmmlearn.hmm import GMM from hmmlearn import hmm from sklearn import svm import copy as cp from scipy.misc import logsumexp import pickle #from adpcm import adm, adm_reconstruct mse = lambda A,B: ((array(A)-array(B)) ** 2).mean() lognorm = lambda A: A - logsumexp(A) #logsum = lambda X: logsumexp(log(X)) def logsum(X): if len(X) == 1: return log(X[0]) else: return logaddexp(log(X[0]),logsum(X[1:])) def smooth_matrix_1D(X): window = scipy.signal.gaussian(51,4) window = window/sum(window) intermx = zeros((X.shape[0],X.shape[1]+100)) intermx[:, 50:-50] = X for m in range(0, X.shape[0]): # print intermx.shape intermx[m,:] = convolve(ravel(intermx[m,:]), window,'same') return intermx[:,50:-50] def adm_reconstruct(codeword, h, dmin=.01, dmax=.28): x = zeros((1, codeword.shape[1])) delta1 = dmin delta2 = dmin Sum = h x[0] = h for i in range(0, codeword.shape[1]): if codeword[0,i] == 0: delta1 = dmin delta2 = dmin elif codeword[0,i] == 1: delta2 = dmin Sum += delta1 delta1 *= 2 if delta1 > dmax: delta1 = dmax elif codeword[0,i] == -1: delta1 = dmin Sum -= delta2 delta2 *= 2 if delta2 > dmax: delta2 = dmax x[0,i] = Sum return x def adm(x, dmin=.01, dmax=.28, tol=0.0001): # Adaptive delta modulation adapted by code: # (adeltamod.m) # # Adaptive Delta Modulator # by Gandhar Desai (gdesai) # BITS Pilani Goa Campus # Date: 28 Sept, 2013 xsig = x Lx = len(x) ADMout = zeros((1, Lx)) codevec = zeros((1, Lx)) Sum = x[0] delta1 = dmin delta2 = dmin mult1 = 2 mult2 = 2 for i in range(0, Lx): #print abs(xsig[i] - Sum) if (abs(xsig[i] - Sum) < tol): bit = 0 delta2 = dmin delta1 = dmin elif (xsig[i] >= Sum): bit = 1 delta2 = dmin Sum += delta1 delta1 *= mult1 if delta1 > dmax: delta1 = dmax else: bit = -1 delta1 = dmin Sum -= delta2 delta2 *= mult2 if delta2 > dmax: delta2 = dmax ADMout[0, i] = Sum codevec[0, i]= bit return ADMout,codevec, x[0] def median_filter(v, K): v2 = zeros((len(v),)) for i in range(K, len(v)): seq = v[i-K:i] m = median(seq) v2[i-K:i] = m return v2 from models import ReverbModel if __name__=="__main__": if len(argv) != 2: print "[EE] Wrong number of arguments" print "[II] Correct syntax is:" print "[II] \t%s <trainingdir>" print "[II] where <trainingdir> is the directory containing the features files in the format *_features.yaml" print "[II] and the parameters in the format *_parameters.yaml" exit(-1) n_clusters = 3 UpsamplingFactor = 1 dmin = 0.001 dmax = 0.28 tol = 0.001 d1min = 0.01 d1max = 0.1 g1min = 0.01 g1max = 0.99 damin = 0.006 damax = 0.012 Gmin = 0.01 Gmax = 0.99 gcmin = 0.01 gcmax = 0.99 # For searching the directory from glob import glob traindir = argv[1] songs_in_dir = glob("%s/*_features.yaml" % traindir) print "[II] Using files: %s" % songs_in_dir chain_length=1 # asdsd # fullfeatures_pool = Pool() features_vector = None parameters_vector = None index_vector = None idx = 0 for f_ in songs_in_dir: infile = f_ paramfile = "%s_parameters.yaml" % f_.split('_features.yaml')[0] print "[II] Using features file: %s" % infile print "[II] and parameters file: %s" % paramfile # paramfile = infile.split(') features_pool = YamlInput(filename = infile)() parameters_pool = YamlInput(filename = paramfile)() d1 = parameters_pool['parameters.d1'] da = parameters_pool['parameters.da'] g1 = parameters_pool['parameters.g1'] gc = parameters_pool['parameters.gc'] G = parameters_pool['parameters.G'] feature_captions = features_pool.descriptorNames() parameter_captions = ['d1','da','g1','gc','G'] svmhmmstr = "" for c in features_pool.descriptorNames(): if c.split('.')[0] == 'metadata' or c.split('.')[0] == 'parameter': feature_captions.remove(c) # close('all') # print "[II] Loaded training data from %s (%s) " % (infile, features_pool['metadata.date'][0]) print "[II] %d Features Available: " % len(feature_captions) print str(feature_captions).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] nfeatures_in = len(feature_captions) nparameters_in = 5 features_vector_one = zeros((nfeatures_in, len(features_pool[feature_captions[0]]))) index_vector_one = ones((len(features_pool[feature_captions[0]]),)) parameters_vector_one = ones((nparameters_in, len(features_pool[feature_captions[0]]))) # paramet for i in range(0, nfeatures_in): features_vector_one[i, :] = features_pool[feature_captions[i]].T # for i in range(0, nparameters_in): # parameters_vector[i, :] = features_pool[parameter_captions[0]].T parameters_vector_one[0, :] = d1*parameters_vector_one[0,:] parameters_vector_one[1, :] = g1*parameters_vector_one[1,:] parameters_vector_one[2, :] = da*parameters_vector_one[2,:] parameters_vector_one[3, :] = gc*parameters_vector_one[3,:] parameters_vector_one[4, :] = G*parameters_vector_one[4,:] # Stores example index number index_vector_one *= idx print "[II] %d parameters used:" % len(parameter_captions) print str(parameter_captions).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7].replace('parameter.','') if features_vector is None: features_vector = features_vector_one else: features_vector = append(features_vector, features_vector_one, 1) if parameters_vector is None: parameters_vector = parameters_vector_one else: parameters_vector = append(parameters_vector, parameters_vector_one, 1) if index_vector is None: index_vector = index_vector_one else: index_vector = append(index_vector, index_vector_one) idx += 1 print "[II] Marking silent parts" features_full_nontransformed_train = features_vector.copy() # silent_parts = zeros((1, len(features_pool[feature_captions[i]].T))) silent_parts = zeros((1, features_vector.shape[1])) rms = features_vector[feature_captions.index('rms'), :] # Implementing Hysteresis Gate -- High threshold is halfway between # the mean and the max and Low is halfway between the mean dn the min rms_threshold_mean = mean(rms) rms_threshold_max = max(rms) rms_threshold_min = min(rms) rms_threshold_high = 0.1 * rms_threshold_mean rms_threshold_low = 0.01 * rms_threshold_mean for n in range(1, len(rms)): prev = rms[n-1] curr = rms[n] if prev >= rms_threshold_high: if curr < rms_threshold_low: silent_parts[0,n] = 1 else: silent_parts[0,n] = 0 elif prev <= rms_threshold_low: if curr > rms_threshold_high: silent_parts[0,n] = 0 else: silent_parts[0,n] = 1 else: silent_parts[0,n] = silent_parts[0,n-1] if silent_parts[0,1] == 1: silent_parts[0, 0] = 1 active_index = invert(silent_parts.flatten().astype(bool)) # Keep only active parts # Uncomment this # features_vector = features_vector[:, active_index] # Keep this for debugging purposes # parameters_vector = parameters_vector[:, active_index] # index_vector = index_vector[active_index] # Label examples for a chain of chain_length # idx = 0 # for i in range(0, len(index_vector)): # index_vector[i] = idx # if i % chain_length == 0: # idx += 1 # number_of_examples = idx # Scale parameters to [0,1] parameters_vector[0,:] = (parameters_vector[0,:] - d1min)/d1max #d1 parameters_vector[1,:] = (parameters_vector[1,:] - g1min)/g1max #g1 parameters_vector[2,:] = (parameters_vector[2,:] - damin)/damax #g1 parameters_vector[3,:] = (parameters_vector[3,:] - Gmin)/Gmax #G parameters_vector[4,:] = (parameters_vector[4,:] - gcmin)/gcmax #gc moments_vector = zeros((features_vector.shape[0], 2)) features_vector_original = features_vector.copy() print "[II] Storing moments vector" for i in range(0, features_vector.shape[0]): mean_ = mean(features_vector[i,:]) std_ = std(features_vector[i,:], ddof=1) moments_vector[i,0] = mean_ moments_vector[i,1] = std_ features_vector[i,:] = (features_vector[i,:] - mean_)/std_ features_vector_old_train = features_vector.copy() # moments_vector_train = moments_vector print "[II] Extracting PCA configuration " kernel, q, featurelist = extract_pca_configuration_from_data(features_vector) q = q + 1 print "[II] Optimal number of PCs to keep: %d" % q feature_captions_array = array(feature_captions) features_to_keep = list(feature_captions_array[featurelist]) print "[II] Decided to keep %d features:" % len(features_to_keep) print str(features_to_keep).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] features_kept_data = features_vector[featurelist,:] features_vector_old_train = features_kept_data features_vector = (kernel.T*features_kept_data)[0:q,:] features_vector_new_train = features_vector # example_features = None # example_parameters = None # example_idx = None # # for idx in range(0, shape(parameters_vector)[1]-chain_length): # example_features_ = features_vector[:, idx:idx+chain_length] # # print example_features_ # example_parameters_ = parameters_vector[:, idx:idx+chain_length] # example_idx_ = ones((shape(example_parameters_)[1],)) # # if example_features is None: # example_features = example_features_ # else: # #print "appending" # example_features = append(example_features, example_features_, 1) # # if example_parameters is None: # example_parameters = example_parameters_ # else: # example_parameters = append(example_parameters, example_parameters_, 1) # # if example_idx is None: # example_idx = example_idx_*idx # else: # example_idx = append(example_idx, example_idx_*idx, 1) # # # # # features_vector = example_features # parameters_vector = example_parameters # index_vector = example_idx print "[II] Trying ADM-coded parameters" print "[II] Upsampling features and parameters by a factor of %d" % UpsamplingFactor # Upsampled features and parameters features_vector_upsampled = smooth_matrix_1D(repeat(features_vector,UpsamplingFactor, axis=1)) # feature_labels_upsampled = repeat(features_clustering_labels,UpsamplingFactor, axis=0) parameters_vector_upsampled = repeat(parameters_vector,UpsamplingFactor, axis=1) # km = KMeans(init='k-means++', n_init=10, max_iter=300, tol=0.0000001, verbose = 0).fit(parameters_vector_upsampled.T) # centers_ = km.cluster_centers_ # labels__ = km.labels_ # # Remove the following two lines in order to restore non-kmeans version # parameters_vector_kmeans_upsampled = array(centers_[labels__]) # # parameters_vector_upsampled = parameters_vector_kmeans_upsampled.T # # # parameters_vector_upsampled = smooth_matrix_1D(parameters_vector_upsampled) parameters_vector_upsampled_adm = matrix(zeros(shape(parameters_vector_upsampled))) parameters_vector_upsampled_code = matrix(zeros(shape(parameters_vector_upsampled))) parameters_vector_upsampled_firstval = matrix(zeros((parameters_vector_upsampled.shape[0],1))) # Reconstructed parameters parameters_vector_upsampled_reconstructed = matrix(zeros(shape(parameters_vector_upsampled))) def adm_matrix(X, dmin=0.001,dmax=0.28,tol=0.001): out = matrix(zeros(shape(X))) code = matrix(zeros(shape(X))) firstval = matrix(zeros((X.shape[0], 1))) for i in range(0, X.shape[0]): out[i,:], code[i,:], firstval[i,0] = adm(X[i,:],dmin=dmin,dmax=dmax,tol=tol) return out,code,firstval # parameters_vector_upsampled_reconstructed[i,:] = adm_reconstruct(parameters_vector_upsampled_code[i,:],parameters_vector_upsampled_firstval[i,0], dmin=dmin,dmax=dmax) def adm_matrix_reconstruct(code, firstval, dmin=0.001, dmax=0.28): X = matrix(zeros(shape(code))) for i in range(0, code.shape[0]): X[i,:] = adm_reconstruct(code[i,:], firstval[i,0], dmin=dmin, dmax=dmax) return X parameters_vector_upsampled_adm, parameters_vector_upsampled_code, parameters_vector_upsampled_firstval = adm_matrix(parameters_vector_upsampled, dmin, dmax, tol) def diff_and_pad(X): return concatenate(( zeros(( shape(X)[0], 1 )), diff(X, axis=1)), axis=1) # Change for adm, # parameters_vector_upsampled = parameters_vector_upsampled_code print "[II] Clustering features." # features_clustering = GMM(n_components = n_clusters, covariance_type='diag') # features_clustering.fit( features_vector_upsampled.T, y=parameters_vector_upsampled_code) # features_clustering_means = features_clustering.means_ features_clustering_labels = features_clustering.predict(features_vector_upsampled.T) features_clustering_sigmas = features_clustering.covars_ # features_vector_upsampled_estimated = zeros(shape(features_vector_upsampled)) # # for n in range(0, len(features_vector_upsampled_estimated[0])): features_vector_upsampled_estimated[:,n] = features_clustering_means[features_clustering_labels[n]] # # print "[II] Features MSE for %d clusters: %.3f" % (n_clusters, mse(features_vector_upsampled, features_vector_upsampled_estimated)) def happy_curve_classification(data, classes, estimator, Nd=1): print "[II] Generating Happy Curve " from copy import deepcopy estimator_fulldata = deepcopy(estimator) estimator_fulldata.fit(data, classes) labels = estimator.predict(data) # 1. Split data in two, training and testing Ntr = int(round(data.shape[0]/2)) # Training dataset size Nts = data.shape[0] - Ntr # Testing dataset size ratios = [] for n in range(Nd, Ntr): train, test, trainl, testl = cross_validation.train_test_split(data, classes, test_size = 0.5, random_state = 0) train = train[0:n,:] trainl = trainl[0:n] # print "trainl" # print trainl estimator_ = deepcopy(estimator) estimator_.fit(train,trainl) labels = estimator_.predict(test) ratio = sum(array(testl==labels).astype(float))/len(labels) ratios.append(ratio) return ratios def cross_validate_classification(data, classes, estimator): print "[II] Crossvalidating... " from copy import deepcopy estimator_fulldata = deepcopy(estimator) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) MSEs = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: train,test,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=0) estimator_ = deepcopy(estimator) estimator_.fit(train, trainlabels) labels = estimator_.predict(test) print "[II] for training(%.2f)-testing(%.2f): %.2f" % ((1-p),p,sum(array(testlabels==labels).astype(float))/len(labels)) return MSEs def cross_validate_clustering(data, estimator): print "[II] Crossvalidating... " estimator_fulldata = estimator estimator_fulldata.fit(data) # labels = estimator_fulldata.predict(data) means = estimator_fulldata.means_ # print means percents = arange(0.1,0.6,0.1) MSEs = [] reconstructed = zeros(shape(data)) labels = estimator.predict(data) for n in range(0, len(reconstructed)): reconstructed[n,:] = means[labels[n]] MSEs.append(mse(data,reconstructed)) for p in percents: train,test = cross_validation.train_test_split(data,test_size=p,random_state=0) train = matrix(train) test = matrix(test) estimator.fit(train) means = estimator.means_ labels = estimator.predict(test) reconstructed = zeros(shape(test)) for n in range(0, len(reconstructed)): reconstructed[n,:] = means[labels[n]] m = mse(test,reconstructed) print "[II] MSE for clustering crossvalidated data %.2f-%.2f: %.5f" % ((1-p), p, m) MSEs.append(m) print "[II] Crossvalidation complete" return MSEs # Construct parameters alphabet, each symbol is going to be a different column vector # in parameter code matrix def vector_to_states(X): """ Input: a vector MxN with N samples and M variables Output: a codeword dictionary `parameters_alphabet', state_seq, inverse `parameters_alphabet_inv' """ parameters_alphabet = {} n = 0 for i in range(0, X.shape[1]): vec = tuple(ravel(X[:,i])) if vec not in parameters_alphabet: parameters_alphabet[vec] = n n += 1 parameters_alphabet_inv = dict([(parameters_alphabet[m],m) for m in parameters_alphabet]) state_seq = array([parameters_alphabet[tuple(ravel(X[:,m]))] for m in range(0, parameters_vector_upsampled_code.shape[1])] ) return state_seq, parameters_alphabet, parameters_alphabet_inv def states_to_vector(predicted, parameters_alphabet_inv): estimated = matrix(zeros((len(parameters_alphabet_inv[0]), len(predicted)))) for i in range(0, len(predicted)): # print "[II] Predicted: ", predicted[i] estimated[:, i] = matrix(parameters_alphabet_inv[predicted[i]]).T return estimated # state_seq, parameters_alphabet, parameters_alphabet_inv = vector_to_states(parameters_vector_upsampled_code) # parameters_change_variable = matrix(diff_and_pad(parameters_vector_upsampled)!=0).astype(int) # changes_state_seq, changes_parameters_alphabet, changes_parameters_alphabet_inv = vector_to_states(parameters_change_variable) # This is an hmm that just codes the changes" # We have only two states, change and stay the same. # Uncomment that here # parameters_vector_upsampled = parameters_vector_upsampled_code # parameters_state_orig, parameter_state_alphabet_orig, parameter_state_alphabet_inv_orig = vector_to_states(parameters_vector) # TODO: HA # parmaeters_vector_upsampled = parameters_vector_upsampled_code parameters_state, parameter_state_alphabet, parameter_state_alphabet_inv = vector_to_states(parameters_vector_upsampled) # asdasdasd print "[II] Testing Gaussian Naive Bayes Classifier" gnb = GaussianNB() gnb.fit(features_vector_upsampled.T, parameters_state) parameters_state_estimated = gnb.predict(features_vector_upsampled.T) output = states_to_vector(parameters_state_estimated, parameter_state_alphabet_inv) figure() subplot(211) plot(parameters_vector_upsampled.T) title('Parameter value upsampled by a factor of %d' % UpsamplingFactor) ylabel('value') xlabel('frame #') subplot(212) #plot(smooth_matrix_1D(output).T) plot(output.T) ylabel('value') xlabel('frame #') errorvector_nbc = [mse(output[:,i], parameters_vector_upsampled[:,i]) for i in range(0, len(parameters_state_estimated))] #cross_validate_classification(features_vector_upsampled.T, parameters_state, gnb) # hc = happy_curve_classification(features_vector_upsampled.T, parameters_state, gnb) # # figure() # plot(hc) # figure() print "[II] Trying Multinomial HMM" # In order to do classification with HMMs, we need to: # 1. Split the parameters into classes # 2. Train one model per class # 3. Feed our data to all the models # 4. Check which has a better score and assig,n to M class SVMClassifier: def __init__(self,**kwargs): print "[II] SVM Classifier " # self.clf = svm.SVC(**kwargs) self.name = "SVM" self.clf = multiclass.OneVsOneClassifier(svm.SVC(**kwargs) ) def fit(self, X, classes): n_classes = max(unique(classes))+1 self.clf.fit(X,classes) def predict(self, X): return self.clf.predict(X) def getName(self): return self.name def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for SVN Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class NBClassifier: def __init__(self): print "[II] Gaussian Naive Bayes Classifier" self.name = "Naive Bayes" self.gnb = GaussianNB() def getName(self): return self.name def fit(self, X, states): self.gnb.fit(X, states) def predict(self, X): return self.gnb.predict(X) def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for NB Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HmmClassifier3: def __init__(self, N=2,n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d time steps, %d components)" % (N, n_components) self.n_components = n_components self.chain_size = N def getName(self): return self.name def fit(self, features, parameters): # print parameters n_classes = max(unique(parameters)) + 1 print "[II] Number of classes: ", n_classes hmms = [None]*n_classes svms = [None]*n_classes chain_size = self.chain_size obs = [None]*n_classes for i in range(chain_size, len(parameters)): class_ = parameters[i-1] seq = features[i-chain_size:i,:] if obs[class_] is None: obs[class_] = [seq] else: obs[class_].append(seq) for i in range(0, len(obs)): if obs[i] is not None and len(obs[i]) != 0: hmm_ = hmm.GaussianHMM(n_components=self.n_components, covariance_type='full') hmm_.fit(obs[i]) hmms[i] = hmm_ self.hmms = hmms return obs def predict(self, features, mfilt=20): chain_size = self.chain_size hmms = self.hmms predicted_classes = zeros((features.shape[0],)) for i in range(chain_size, features.shape[0]): scores = zeros((len(hmms),)) seq = features[i-chain_size:i, :] for j in range(0, len(hmms)): if hmms[j] is not None: scores[j] = hmms[j].score(seq) else: scores[j] = -infty predicted_classes[i] = argmax(scores) # Do a median filter at the end # predicted_classes = median_filter(predicted_classes,mfilt) return predicted_classes def k_fold_cross_validate(self, data, classes, K=5): print "[II] HMM Classifier Crossvalidating... " print "[II] Validating data: ", data print "[II] With classes: ", classes from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) labels = estimator_fulldata.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(classes)) # Crossvalidate this. example_sequences = [] example_labels = [] chain_size = self.chain_size percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] # Constructing examples and labels L = data.shape[0]/K M = K # print "[II] Example size:", L n = 1 while L*(n-1) < M*L: if L*n > shape(data)[0]: example = data[L*(n-1):,:] classes_ = classes[L*(n-1):] else: example = data[L*(n-1):L*n, :] classes_ = classes[L*(n-1):L*n] # print example # print classes_ example_sequences.append(example) example_labels.append(classes_) n+=1 # print len(example_sequences) # print len(example_labels) from sklearn.cross_validation import KFold kf = KFold(K, n_folds=K) ratio = 0 for train, test in kf: print "[II] Validating examples %s against %s." % (train, test) topredict_data = example_sequences[test[0]] topredict_labels = example_labels[test[0]] tofit_data = None tofit_labels = None for t in train: # print t if tofit_data is None: tofit_data = example_sequences[t] tofit_labels = example_labels[t] else: tofit_data = append(tofit_data, example_sequences[t], 0) tofit_labels = append(tofit_labels, example_labels[t], 0) estimator_ = deepcopy(estimator) estimator_.fit(tofit_data, tofit_labels) labels = estimator_.predict(topredict_data) m = sum(array(topredict_labels==labels).astype(float))/len(labels) print "[II] %d-fold cross-validation for HMM Classifier: %.1f" % (K, m) ratio += m/float(len(kf)) return ratio # Splitting to train/test def cross_validate(self, data, classes): print "[II] HMM Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) labels = estimator_fulldata.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(classes)) # Crossvalidate this. example_sequences = [] example_labels = [] chain_size = self.chain_size percents = arange(0.5,0.9,0.1) self.percents = percents * 100 RATIOS = [] # Constructing examples and labels M = self.chain_size L = data.shape[0]/20 print "[II] Example size:", L n = 1 while L*n < M*L-L: example = data[L*(n-1):L*n, :] # print example classes_ = classes[L*(n-1):L*n] # print classes_ example_sequences.append(example) example_labels.append(classes_) n+=1 # Splitting to train/test for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(example_sequences, example_labels, test_size=p, random_state=1) print traindata print testdata # Concatenate traindata tofit_data = None tofit_labels = None topredict_data = None topredict_labels = None for t in traindata: if tofit_data is None: tofit_data = t else: tofit_data = append(tofit_data, t, 0) for l in trainlabels: if tofit_labels is None: tofit_labels = l else: tofit_labels = append(tofit_labels, l) for t in testdata: if topredict_data is None: topredict_data = t else: topredict_data = append(topredict_data, t, 0) for l in testlabels: if topredict_labels is None: topredict_labels = l else: topredict_labels = append(topredict_labels, l) # print "[II] Fit data: ", tofit_data ## print "[II] Fit labels: ", tofit_labels # print "[II] Predict data: ", topredict_data # print "[II] Predict labels: ", topredict_labels # estimator_ = deepcopy(estimator) print tofit_labels estimator_.fit(tofit_data, tofit_labels) labels = estimator_.predict(topredict_data) m = sum(array(topredict_labels==labels).astype(float))/len(labels) print "[II] [%.2f-%.2f] cross-validation for HMM Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HmmClassifier2: def __init__(self, N=2, chain_size=4, n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d states, %d components)" % (N, n_components) self.n_components = n_components self.chain_size = chain_size self.hmms_ = [] self.N = N self.n_states = N def getName(self): return self.name def fit(self, features, parameters): self.n_params = len(unique(parameters)) for n in range(0, self.n_params): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type='full') # Get training data for each parameter for i in range(0, len(parameters)-self.chain_size): seq = features[i:i+self.chain_size, :] if parameters[i+self.chain_size-1] == n and seq.shape[0] == self.chain_size: # print "[II] Fitting: %s" % str(seq) hmm_.fit([seq]) self.hmms_.append(hmm_) def predict(self, X): labels = zeros((X.shape[0],)) N = self.N scores = zeros((self.n_states, )) for i in range(0, len(labels)): testdata = X[i:i+self.chain_size,:] for j in range(0, self.n_states): scores[j] = self.hmms_[j].score(testdata) labels[i] = argmax(scores) return labels def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for HMM2 Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents class HmmClassifier: def __init__(self, N=2, n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d states, %d components)" % (N, n_components) self.n_components = n_components self.chain_size = N self.hmms_ = [] self.N = N def getName(self): return self.name def fit(self, X, states): self.n_states = len(unique(states)) for n in range(0, self.n_states): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type = 'full') # Get training data for each class vector = X[states == n,:] # Fit the HMM # print vector hmm_.fit([vector]) # And append to the list self.hmms_.append(hmm_) def predict(self,X): labels = zeros((X.shape[0],)) N = self.N m = 0 scores = zeros((1, self.n_states)) while m*N < X.shape[0]: if (m+1)*N > X.shape[0]: testdata = X[m*N:,:] else: testdata = X[m*N:(m+1)*N,:] # print testdata for i in range(0, self.n_states): scores[0,i] = self.hmms_[i].score(testdata) if (m+1)*N > X.shape[0]: labels[m*N:] = argmax(scores) else: labels[m*N:(m+1)*N] = argmax(scores) m+= 1 return labels # def cross_validate(self, data, classes, index_vector): # print "[II] Crossvalidating... " # from copy import deepcopy # estimator = deepcopy(self) # estimator_fulldata = deepcopy(self) # estimator_fulldata.fit(data, classes) # # percents = arange(0.1,0.9,0.1) # self.percents = percents * 100 # RATIOS = [] # labels = estimator.predict(data) # # print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) # # for p in percents: # train,test,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # estimator_ = deepcopy(estimator) # estimator_.fit(train, trainlabels) # labels = estimator_.predict(test) # m = sum(array(testlabels==labels).astype(float))/len(labels) # RATIOS.append(m) # print "[II] for training(%.2f)-testing(%.2f): %.2f" % ((1-p),p,sum(array(testlabels==labels).astype(float))/len(labels)) # # return RATIOS, percents def cross_validate(self, data, classes): print "[II] SVN Classifier Crossvalidating... " from copy import deepcopy estimator = deepcopy(self) estimator_fulldata = deepcopy(self) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) self.percents = percents * 100 RATIOS = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: traindata,testdata,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=1) # print "[II] [%.2f-%.2f] Examples used for training: %s" % (p, (1-p), str(traindata)) # print "[II] [%.2f-%.2f] Examples used for testing: %s" % (p, (1-p), str(testdata)) # Training phase estimator_ = deepcopy(estimator) estimator_.fit(traindata, trainlabels) predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) # testdata = None # # traindata = None # trainlabels = None # testlabels = None # # for t in train: # if traindata is None: # traindata = data[index_vector == t, :] # trainlabels = classes[index_vector == t] # else: # traindata = append(traindata, data[index_vector == t, :], 0) # trainlabels = append(trainlabels, classes[index_vector ==t]) # # estimator_ = deepcopy(estimator) # estimator_.fit(traindata, trainlabels) # # # for t in test: # if testdata is None: # testdata = data[index_vector == t, :] # testlabels = classes[index_vector == t] # else: # testdata = append(testdata, data[index_vector == t,:], 0) # testlabels = append(testlabels, classes[index_vector == t]) # predicted_labels = estimator_.predict(testdata) m = sum(array(predicted_labels==testlabels).astype(float))/len(testlabels) print "[II] [%.2f-%.2f] 5-fold cross-validation for HMM Classifier: %.1f" % (p, (1-p), m) RATIOS.append(m) return RATIOS,percents def cross_validate_full_k_fold(data, classes, list_of_classifiers, K=10): """ K-Folds Cross-validates all available classifiers, plots and saves barplots. """ ratios = [] for l in list_of_classifiers: ratio = l.k_fold_cross_validate(data, classes, K) ratios.append(ratio) L = len(ratios) ind = arange(L) fig, ax = subplots() rects = [] width = 10/float(len(ratios)) colors = ['r', 'y', 'g', 'c'] labels_ = [] for i in range(0, len(ratios)): rects.append(ax.bar(i*width,ratios[i],width,color=colors[i % len(colors)])) labels_.append(list_of_classifiers[i].getName()) return ratios def cross_validate_full(seq, classes, list_of_classifiers): """ Cross-validates all available classifiers, plots and saves barplots. """ ratios = [] percents = [] for l in list_of_classifiers: ratios_, percents_ = l.cross_validate(seq, classes) ratios.append(ratios_) percents.append(percents_) L = len(ratios[0]) ind = arange(L) fig,ax = subplots() rects = [] W = 0.8 width = W/float(len(ratios)) colors = ['r', 'y', 'g', 'c'] labels_ = [] for i in range(0, len(ratios)): rects.append(ax.bar(ind+i*width,ratios[i],width,color=colors[i % len(colors)])) labels_.append(list_of_classifiers[i].getName()) ax.legend(tuple(labels_)) ax.set_xticks(ind+W/2) ax.set_xticklabels(tuple(percents[0]*100)) ax.set_xlabel("Percentage % of test data") ax.set_ylabel("Success ratio") # rects1 = ax.bar(ind,ratios[0],0.35,color='r') # rects2 = ax.bar(ind+0.351,ratios[1],0.35,color='y') return ratios # hmmc = HmmClassifier(N = len(unique(parameters_state)), n_components = 1) # hmmc.fit(features_vector_upsampled.T, parameters_state) # # hmmc2 = HmmClassifier(N = 12, n_components = 4) # hmmc2.fit(features_vector_upsampled.T, parameters_state) # svmc = SVMClassifier(probability=True) svmc.fit(features_vector_upsampled.T, parameters_state) # # svmc.cross_validate(features_vector_upsampled.T, parameters_state, index_vector) # nbc = NBClassifier() nbc.fit(features_vector_upsampled.T, parameters_state) # # #hchmmc = happy_curve_classification(features_vector_upsampled.T, parameters_state, hmmc, Nd=100) # # cross_validate_classification(features_vector_upsampled.T, parameters_state, hmmc) # # #hmmc.cross_validate(features_vector_upsampled.T, parameters_state) # hmmc3 = HmmClassifier3(N=6, n_components = max(unique(parameters_state))) obs = hmmc3.fit(features_vector_upsampled.T, parameters_state) # svmhmm = HmmSVMClassifier() # svmhmm.fit(features_vector_upsampled.T, parameters_state) # ratios = cross_validate_full(features_vector_upsampled.T, parameters_state, [nbc, svmc]) # ratios = cross_validate_full_k_fold(features_vector_upsampled.T, parameters_state, # [hmmc3]) def approximate_chain_size(data,parameters): ratios = [] # chainsizes = arange(1, 40) # for cs in chainsizes: # print "[II] Trying chain size: %d" % cs # hmmc = HmmClassifier3(N=cs, n_components = 2) # ratios.append(hmmc.k_fold_cross_validate(data,parameters,K=10)) componentssize = arange(1, 12) for cs in componentssize: print "[II] Trying component size: %d" % cs hmmc = HmmClassifier3(N=6, n_components = cs) ratios.append(hmmc.k_fold_cross_validate(data, parameters, K=2)) figure() plot(componentssize, ratios) xlabel('Chain Size') ylabel('Success Ratio') title('10-Fold cross validation success ratio vs chain size') return ratios # ratios = approximate_chain_size(features_vector_upsampled.T, parameters_state) #predictedhmmc2 = hmmc2.predict(features_vector_upsampled.T) # figure() # hmms_ = [] # # for n in range(0, len(parameter_state_alphabet)): # #hmm_ = hmm.GMMHMM(n_components = 1, n_mix = 2) # hmm_ = hmm.GaussianHMM(n_components = 1,covariance_type = 'full') # # # Get training data for each class # vector = features_vector_upsampled[:,parameters_state == n] # # #if vector.shape[1] < n_clusters: # # hmms_.append(None) # #else: # # hmm_.fit([vector.T]) # # # Append to the list # # hmms_.append(hmm_) # # labels = zeros((features_vector_upsampled.shape[1],)) # # N = 20 # m = 0 # # while m*N < features_vector_upsampled.shape[1]: # # scores = zeros((1, len(parameter_state_alphabet))) # # if (m+1)*N > features_vector_upsampled.shape[1]: # testdata = features_vector_upsampled[:,m*N:] # else: # testdata = features_vector_upsampled[:,m*N:(m+1)*N] # # for i in range(0, len(parameter_state_alphabet)): # if hmms_[i] is not None: # scores[0,i] = hmms_[i].score(testdata.T) # else: # scores[0,i] = -100000 # Very large negative score # if (m+1)*N >= features_vector_upsampled.shape[1]: # labels[m*N:] = argmax(scores) # else: # labels[m*N:(m+1)*N] = argmax(scores) # # m += 1 # figure() #plot(labels.T) # labels = hmmc.predict(features_vector_upsampled.T) # estimated = states_to_vector(labels,parameter_state_alphabet_inv) # plot(estimated.T,'r--') # # title('Estimated parameter values') # legend(['Naive Bayes Classifier', 'HMM chain size %d (%.1fms)' % (N, float(N)/UpsamplingFactor*23.0)]) # # ylabel('value') # xlabel('frame #') # # close('all') # plot(features_clustering_labels/float(max(features_clustering_labels))) # plot(parameters_vector_upsampled.T/max(ravel(parameters_vector_upsampled))) def plot_clusters(x, labels): figure() symbols = ['>', 'x', '.', '<','v'] colors = ['b', 'r', 'g', 'm','c'] for r in range(0, x.shape[0]): scatter(x[r,0], x[r,1], c=colors[int(labels[r]) % len(colors)], marker=symbols[int(labels[r]) % len(symbols)]) # plot_clusters(features_vector_upsampled.T, parameters_state) # # SVM class HmmClassifierSVN_Naive: def __init__(self, N=2, n_components = 1): self.n_components = n_components self.chain_size = N self.hmms_ = [] self.N = N def fit(self, X, states): self.n_states = len(unique(states)) for n in range(0, self.n_states): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type = 'full') # Get training data for each class vector = X[states == n,:] # Fit the HMM # print vector hmm_.fit([vector]) # And append to the list self.hmms_.append(hmm_) def predict(self,X): labels = zeros((X.shape[0],)) N = self.N m = 0 scores = zeros((1, self.n_states)) while m*N < X.shape[0]: if (m+1)*N > X.shape[0]: testdata = X[m*N:,:] else: testdata = X[m*N:(m+1)*N,:] # print testdata for i in range(0, self.n_states): scores[0,i] = self.hmms_[i].score(testdata) if (m+1)*N > X.shape[0]: labels[m*N:] = argmax(scores) else: labels[m*N:(m+1)*N] = argmax(scores) m+= 1 return labels def plot_parameters_estimation(list_of_estimators): for e in list_of_estimators: name_ = e.getName() fig = figure() fig.suptitle('Parameters estimation using %s' % name_, fontweight='bold') subplot(311) title ('original parameters') param_real = states_to_vector(parameters_state,parameter_state_alphabet_inv).T plot(param_real) legend(parameter_captions) subplot(312) title('estimated parameters') pred = e.predict(features_vector_upsampled.T) param_est = states_to_vector(pred,parameter_state_alphabet_inv).T plot(param_est) legend(parameter_captions) subplot(313) error_real_est = zeros((len(param_est),)) for i in range(0, len(error_real_est)): error_real_est[i] = mse(param_real[i],param_est[i]) totalmse = sum(error_real_est) title('mean squared error (total: %.3f)' % totalmse) plot(error_real_est) plot_parameters_estimation([nbc, svmc, hmmc3]) class HmSVMClassifier: def __init__(self, N=2,n_components = 1): print "[II] HMM Classifier (%d states, %d components)" % (N, n_components) self.name = "HMM (%d time steps, %d components)" % (N, n_components) self.n_components = n_components self.chain_size = N def getName(self): return self.name def fit(self, features, parameters, fname_training = "trainingdat.dat", fname_model = "training.model", C = 689): svmhmm = "" idx = 0 chain_size = self.chain_size for i in range(chain_size, len(parameters)): idx += 1 class_ = parameters[i-1] seq = features[i-chain_size:i,:] for m in range(0, len(seq)): s = seq[m] svmhmm += "%d qid:1.%d " % (class_, idx) for j in range(0, len(s)): svmhmm += "%d:%.4f " % (j+1, s[j]) svmhmm += "\n" fileout = open(fname_training, 'w') fileout.write(svmhmm) fileout.close() import subprocess # C = idx subprocess.call(["svmhmm/svm_hmm_learn", '-c', '%d' % C, '-e', '0.5', fname_training, fname_model]) return svmhmm n_classes = max(unique(parameters_state)) + 1 hist_p_q = histogram(parameters_state, n_classes)[0].astype(float) prior_p_q = hist_p_q/sum(hist_p_q) transmat = zeros((n_classes,n_classes)) for i in range(1, len(parameters_state)): prev = parameters_state[i-1] cur = parameters_state[i] transmat[prev,cur] += 1 transmat = transmat/sum(transmat) hmm2 = hmm.MultinomialHMM(n_components = n_classes, startprob = prior_p_q, transmat = transmat ) # TODO: Embed SVM emissions to HMM. Try to do pick maximum likelihood sequence from # classifiers like in HmmClassifier3. For every class, train an SVM, with n_components as # states, and compute the log-likelihood for prediction. Should work. class MyAVAClassifier: def __init__(self): self.classifiers = {} self.name = "Linear SVM Classifier" # self.probabilities = {} def fit(self, X, y, flr = 0): n_classes = max(unique(y)) + 1 classes = arange(0, n_classes) self.n_classes = n_classes # M = n_classes*(n_classes-1) # Number of classifiers h = histogram(y, n_classes)[0].astype(float) self.prior = h/sum(h) transmat = zeros((n_classes, n_classes)) for i in range(1, len(y)): prev = y[i-1] cur = y[i] transmat[prev,cur] += 1 transmat = transmat/sum(transmat) self.transmat = transmat # Add a very small probability for random jump to avoid zero values self.transmat += flr self.transmat = self.transmat/sum(self.transmat) for i in range(0, n_classes): for j in range(0, n_classes): if i != j and (i,j) not in self.classifiers and (j,i) not in self.classifiers: print (i,j) idx_ = bitwise_or(y == classes[i], y == classes[j]) X_ = X[idx_, :] y_ = y[idx_] svm_ = svm.SVC(probability = True) # print y_ svm_.fit(X_, y_) self.classifiers[(i,j)] = svm_ # self.probabilities[(i,j)] = svm_.predict_proba(X) def estimate_pairwise_class_probability(self, i, j, x): if (i,j) not in self.classifiers and (j,i) in self.classifiers: return self.classifiers[(j,i)].predict_proba(x)[0,1] else: return self.classifiers[(i,j)].predict_proba(x)[0,0] def estimate_posterior_probability(self, i, x): mus = zeros((self.n_classes,)) for j in range(0, self.n_classes): if i != j: mus[j] = 1/self.estimate_pairwise_class_probability(i,j,x) # print mus S = sum(mus) - (self.n_classes - 2) # print S return 1/S def estimate_posterior_probability_vector(self, x): posterior = zeros((self.n_classes,)) for i in range(0, len(posterior)): posterior[i] = self.estimate_posterior_probability(i, x) return posterior # def estimate_emission_probability(self, i, x): # post = self.estimate_posterior_probability_vector(x) # # print "post: ", post # prior = self.prior # # print "prior: ", prior # p = post/prior # p = p/sum(p) # # return p[i] # def estimate_emissmat(self, X): # emissmat = zeros((X.shape[0], self.n_classes)) # for i in range(0, X.shape[0]): # for j in range(0, self.n_classes): # emissmat[i,j] = self.estimate_emission_probability(j, X[i,:]) # # return emissmat def predict(self, X): predicted = zeros((X.shape[0],)) for i in range(0, X.shape[0]): x = X[i,:] P = zeros((self.n_classes,)) for c in range(0, len(P)): P[c] = self.estimate_posterior_probability(c, x) pred = argmax(P) predicted[i] = pred return predicted class HMMsvmClassifier: def __init__(self, N=2): print "[II] HMM-SVM Classifier (%d states)" % N self.name = "HMM-SVM (%d time steps)" % N # self.n_components = n_components self.chain_size = N self.svm = MyAVAClassifier() def getName(self): return self.name def fit(self, features, parameters): # First train emission svm self.svm.fit(features, parameters) # print parameters n_classes = max(unique(parameters)) + 1 print "[II] Number of classes: ", n_classes hmms = [None]*n_classes chain_size = self.chain_size params = [None]*n_classes obs = [None]*n_classes for i in range(chain_size, len(parameters)): class_ = parameters[i-1] params_ = parameters[i-chain_size:i] feats_ =features[i-chain_size:i,:] if params[class_] is None: params[class_] = [params_] obs[class_] = [feats_] else: params[class_].append(params_) obs[class_].append(feats_) for i in range(0, len(params)): if params[i] is not None and len(params[i]) != 0: hmm_ = HMMsvm(self.svm) print "[II] %d Fitting params: " % i, params[i] hmm_.fit(obs[i], params[i]) # TODO: Left here on 06/07 19:56 # hmm_.fit(features,) # print "obs: ", obs[i] # print "params: ", params[i] # hmm_.fit(obs[i], params[i],flr=1e-9) hmms[i] = hmm_ self.hmms = hmms return obs def predict(self, features, mfilt=20): chain_size = self.chain_size hmms = self.hmms predicted_classes = zeros((features.shape[0],)) for i in range(chain_size, features.shape[0]): scores = zeros((len(hmms),)) seq = features[i-chain_size:i, :] for j in range(0, len(hmms)): if hmms[j] is not None: scores[j] = hmms[j]._log_likelihood(seq) else: scores[j] = -infty predicted_classes[i] = argmax(scores) # Do a median filter at the end # predicted_classes = median_filter(predicted_classes,mfilt) return predicted_classes class HMMsvm: def __init__(self, svm_): self.svm = svm_ # self.svm = MyAVAClassifier() def fit(self, features_list, states, flr=1e-12): # TODO: Concatenate features, train #self.svm.fit(X, states, flr) #self.prior = self.svm.prior #self.transmat = self.svm.transmat features = None for f in features_list: if features is None: features = f else: features = append(features, f, 0) fullstates = None T = features.shape[0] for s in states: if fullstates is None: fullstates = s else: fullstates = append(fullstates, s) self.n_classes = self.svm.n_classes n_classes = self.n_classes print fullstates, shape(fullstates) h = histogram(fullstates, n_classes)[0].astype(float) self.prior = h/sum(h) # Add a very small probability for random jump self.prior += flr self.prior = self.prior/sum(self.prior) transmat = zeros((n_classes, n_classes)) transitions = zeros((n_classes, )) for s in states: for i in range(1, len(s)): prev = s[i-1] cur = s[i] transmat[prev,cur] += 1 transitions[prev] += 1 # print "Adding one to", (prev,cur) transitions[transitions == 0] = 1 for i in range(0, transmat.shape[0]): transmat[i,:] = transmat[i,:]/transitions[i] self.transmat = transmat # Add a very small probability for random jump to avoid zero values self.transmat += flr self.transmat = self.transmat/sum(self.transmat) # Alphas and Betas X = features alphas = zeros((T,n_classes)) betas = zeros((T,n_classes)) forward_messages = zeros((T,n_classes)) backward_messages = zeros((T, n_classes)) print "[II] Computing alpha, beta..." for t in range(1, T+1): forward_messages[t-1,:] = self._forward_message(forward_messages[t-2,:],X[0:t,]) backward_messages[-t,:] = self._backward_message(backward_messages[-t+1], X[-t:,:]) print "[II] Computing ksu..." a_norm = forward_messages[-1,n_classes-1] # alpha_T ksu = zeros((T, n_classes, n_classes)) for i in range(0, n_classes): for j in range(0, n_classes): for t in range(0, T-1): ksu[t,i,j] = exp(forward_messages[t, i] + log(transmat[i,j]) + log(self.estimate_emission_probability(X[t,:],j)) + backward_messages[t+1,j] - a_norm) self.alphas = forward_messages self.betas = backward_messages self.ksu = ksu transmat_new = zeros((n_classes,n_classes)) for i in range(0, n_classes): for j in range(0, n_classes): transmat_new[i,j] = sum(ksu[:,i,j])/sum(ksu[:,i,:]) self.transmat_new = transmat_new # print observations def estimate_emission_probability(self, x, q): post = self.svm.estimate_posterior_probability_vector(x) # print "post: ", post prior = self.prior # print "prior: ", prior p = post/prior p = p/sum(p) return p[q] # def estimate_emission_probability(self, x, q): # return self.svm.estimate_emission_probability(q, x) def logviterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initalization a1 = self.prior delta = zeros((N,)) psi = zeros((N,)) for i in range(0, N): delta[i] = log(a1[i]) + log(self.estimate_emission_probability(X[0,:],i)) #2. Recursion for t in range(1, T): delta_old = delta.copy() x = X[t, :] for j in range(0, N): delta[j] = max(delta_old + log(transmat[i,j])) psi[j] = argmax(delta_old + log(transmat[i,j])) # 3. Termination p_star = max(delta + log(transmat[:,N-1])) q_star = argmax(delta + log(transmat[:, N-1])) # 4. Backtracking q = zeros((T,)) q[-1] = q_star for t in range(1, T): q[-t-1] = psi[q[-t]] return q def viterbi(self, X): # Number of states N = self.n_classes # Number of observations T = X.shape[0] transmat = self.transmat #1. Initialization a1 = self.prior delta = zeros((N, )) psi = zeros((N, )) for i in range(0, N): delta[i] = a1[i]*self.estimate_emission_probability(X[0,:],i) #2. Recursion for t in range(1, T): delta_old = delta.copy() x = X[t,:] for j in range(0, N): delta[j] = max(delta_old*transmat[:,j])*self.estimate_emission_probability(x, j) psi[j] = argmax(delta_old*transmat[:,j]) #3. Termination p_star = max(delta*transmat[:,N-1]) q_star = argmax(delta*transmat[:,N-1]) #4. Backtracking q = zeros((T,)) q[-1] = q_star for t in range(1, T): q[-t-1] = psi[q[-t]] return q def _log_likelihood_matrix(self, X): N = self.n_classes ll = zeros((X.shape[0], N)) for i in range(0, X.shape[0]): ll[i,:] = self._forward(i, X) return ll def compute_ksus(self, X): N = self.n_classes T = X.shape[0] print "[II] Computing gammas... " gammas = self._forward_backward(X) # Initialize aij = self.transmat def _not_quite_ksu(self, t, i, j, X): alpha = exp(self.forward(X[0:t+1,:]))[i] beta = exp(self.backward(X[t+1:,:]))[j] aij = self.transmat[i,j] bj = self.estimate_emission_probability(X[t+1,:], j) return alpha*beta*aij*bj def _ksu(self, t, i, j, X): alphaT = exp(self.forward(X[0:t+1,:]))[0] return self._not_quite_ksu(t,i,j,X) def _forward_backward(self, X): T = X.shape[0] N = self.n_classes fv = zeros((T, N)) sv = zeros((T, N)) b = None for t in range(1, T+1): fv[t-1,:] = self._forward_message(fv[t-2,:], X[0:t, ]) for t in range(1, T+1): b = self._backward_message(b, X[-t: , :]) sv[-t,:] = lognorm(fv[t-1,:]*b) return sv def _backward(self, t, X): N = self.n_classes a = ones((N,))/N beta = zeros((N, )) transmat = self.transmat T = X.shape[0] x = X[t, :] if t == T-1: beta = log(a) return beta else: tosum = zeros((N, )) bt = self._backward(t+1, X) btnew = zeros((N, )) # print bt for j in range(0, N): for i in range(0, N): # print log(self.estimate_emission_probability(x, j)), bt[i], log(transmat[i,j]) tosum[i] = log(self.estimate_emission_probability(x, j)) + bt[i] + log(transmat[i,j]) # print tosum btnew[j] = logsumexp(tosum) btnew = lognorm(btnew) return btnew def forward(self, X): T = X.shape[0] f = None for t in range(1, T+1): f = self._forward_message(f, X[0:t, :]) return f def backward(self, X): T = X.shape[0] b = None for t in range(1,T+1): # print t # print t,b idx = arange(-t,T) # print idx b = self._backward_message(b, X[-t:, :]) return b def _backward_message(self, b, X): N = self.n_classes beta = zeros((N, )) transmat = self.transmat x = X[0, :] if X.shape[0] == 1: beta = log(ones((N,))/N) return beta else: tosum = zeros((N, )) btnew = zeros((N, )) for j in range(0, N): for i in range(0, N): tosum[i] = log(self.estimate_emission_probability(x,j)) + b[i] + log(transmat[i,j]) btnew[j] = logsumexp(tosum) btnew = lognorm(btnew) return btnew def _forward_message(self, f, X): N = self.n_classes a = self.prior alpha = zeros((N, )) transmat = self.transmat x = X[-1, :] if X.shape[0] == 1: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) alpha = lognorm(alpha) return alpha else: tosum = zeros((N,)) ftnew = zeros((N,)) for j in range(0, N): for i in range(0, N): tosum[i] = f[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) ftnew = lognorm(ftnew) return ftnew def _forward(self, t, X): N = self.n_classes a = self.prior # print a alpha = zeros((N, )) transmat = self.transmat x = X[t, :] if t == 0: for i in range(0, N): alpha[i] = log(a[i]) + log(self.estimate_emission_probability(x, i)) # print "--" # print alpha alpha = lognorm(alpha) # print alpha # print "xx" return alpha else: tosum = zeros((N, )) ft = self._forward(t-1, X) ftnew = zeros((N, )) for j in range(0, N): for i in range(0, N): # print ft tosum[i] = ft[i] + log(transmat[i,j]) Sum = logsumexp(tosum) bj = self.estimate_emission_probability(x, j) ftnew[j] = Sum+log(bj) ftnew = lognorm(ftnew) return ftnew # # def forward(self, X): # # Compute log likelihood using the forward algorithm # # # Number of states # N = self.n_classes # # # Number of Observations # T = X.shape[0] # # # transmat = self.transmat # # # Initialization # # a1 = ones((N,))/N # a1 = self.prior # # alpha = zeros((N,)) # for i in range(0, N): # alpha[i] = log(a1[i])+log(self.estimate_emission_probability(X[0,:], i)) # # print alpha # # Recursion # for t in range(0, T): # alpha_old = alpha.copy() # x = X[t, :] # for j in range(0, N): # tosum = zeros((N,)) # for i in range(0, N): # tosum[i] = alpha_old[i]+log(transmat[i,j]) # # # Sum = logsum(tosum) # Sum = logsumexp(tosum) # bj = self.estimate_emission_probability(x, j) # # alpha[j] = Sum+log(bj) # # print alpha # # tosum = zeros((N,)) # for i in range(0, N): # tosum[i] = alpha[i] + log(transmat[i,N-1]) # # return tosum # # def backward(self, X): # # Number of states # N = self.n_classes # # # Number of Observations # T = X.shape[0] # # transmat = self.transmat # # # Initialization # # b1 = ones((N,))/N # # beta = zeros((N,)) # for i in range(0, N): # beta[i] = log(b1[i])+log(self.estimate_emission_probability(X[-1,:], i)) # # for t in range(0, T): # beta_old = beta.copy() # x = X[-t, :] # for j in range(0, N): # tosum = zeros((N, )) # for i in range(0, N): # tosum[i] = beta_old[i]+log(transmat[i,j]) # # Sum = logsumexp(tosum) # bj = self.estimate_emission_probability(x, j) # beta[j] = Sum+log(bj) # # tosum = zeros((N,)) # # for i in range(0, N): # tosum[i] = beta[i] + log(transmat[i,0]) # # #3 p = logsumexp(tosum) # # return tosum def _log_likelihood(self, X): return logsumexp(self.forward(X)) def _likelihood(self, X): # Number of states N = self.n_classes # Number of Observations T = X.shape[0] transmat = self.transmat # Initialization # a1 = ones((N,))/N a1 = self.prior alpha = zeros((N,)) for i in range(0, N): alpha[i] = a1[i]*self.estimate_emission_probability(X[0,:], i) # Recursion print alpha for t in range(1, T): alpha_old = alpha.copy() x = X[t, :] for j in range(0, N): Sum = 0 for i in range(0, N): Sum += alpha_old[i]*transmat[i,j] alpha[j] = Sum*self.estimate_emission_probability(x, j) print alpha # Termination Sum = 0 for i in range(0, N): Sum += alpha[i]*transmat[i, N-1] p = Sum return p # def fit(self, X, states): # # print parameters # n_classes = max(unique(states)) + 1 # # # svms = [None]*n_classes # obs = [None]*n_classes # sta = [None]*n_classes # chain_size = self.chain_size # # #22 obs = None # # sta = None # # print "[II] Number of classes: ", n_classes # # # For each class: # # Concatenate examples # # Fit SVM # # for i in range(chain_size, len(states)): # class_ = states[i-1] # seq = X[i-chain_size:i, :] # states_ = states[i-chain_size:i] # # # if obs[class_] is None: # obs[class_] = [seq] # sta[class_] = [states_] # self.svms.append(MyAVAClassifier()) # else: # obs[class_].append(seq) # sta[class_].append(states_) # # # for i in range(0, len(obs)): # # o = None # s = None # # for j in range(0, len(obs[i])): # if o is None: # o = obs[i][j] # s = sta[i][j] # # else: # o = append(o, obs[i][j],0) # s = append(s, sta[i][j]) # # # self.svms[i].fit(o, s) # def predict(self, features): # chain_size = self.chain_size # svms = self.svms # # predicted_classes = zeros((features.shape[0],)) # for i in range(chain_size, features.shape[0]): # scores = zeros((len(svms))) # # seq = features[i-chain_size:i, :] # # for j in range(0, len(svms)): # if svms[j] is not None: # scores[j] = svms[j].compute_log_likelihood(seq) # else: # scores[k] = -infty # predicted_classes[i] = argmax(scores) # # return predicted_classes # Marginalize over the latent variable # for i in range(0, X.shape[0]): # P = zeros((self.n_classes,)) # x = X[i,:] # for j in range(0, len(P)): # P[j] = self.estimate_emission_probability(j, x) # # S[i] = log(sum(P[j])) # # return sum(S) # Continue from here X = features_vector_upsampled.T y = parameters_state clf = svm.SVC() clf.fit(X,y) parameters_state_y = clf.predict(X) predhmmc3 = hmmc3.predict(features_vector_upsampled.T) myava = MyAVAClassifier() myava.fit(features_vector_upsampled.T, parameters_state) predmyava = myava.predict(features_vector_upsampled.T) # hmmsvmc = HMMsvmClassifier(N=2) # hmmsvmc.fit(features_vector_upsampled.T, parameters_state) #predhmmsvmc = hmmsvmc.predict(features_vector_upsampled.T) #plot(parameters) figure() plot(parameters_state,'b') plot(parameters_state_y,'r--') plot(predhmmc3,'g+-') # plot(predhmmsvmc, 'bo-') legend(['Real', 'SVM', 'HMM3','HMM-SVM']) # TODO, HMM with SVN, Cross-validation # Using hidden markov svm svmhmm = "" print "[II] Success ratio for SVN: %.2f" % (sum(parameters_state==parameters_state_y).astype(float)/float(len(parameters_state))) print "[II] Success ratio for HMM: %.2f" % (sum(parameters_state==predhmmc3).astype(float)/float(len(predhmmc3)) ) print "[II] All-vs-All custom Support Vector Classifier Success Ratio: %.2f " %(sum(predmyava==parameters_state).astype(float)/len(parameters_state)) model_ghmm = ReverbModel("GaussianHMM Classifier Model", kernel, q, features_to_keep, hmmc3, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector) model_gnb = ReverbModel("Naive Bayes Classifier Model", kernel, q, features_to_keep, nbc, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector ) model_svm = ReverbModel("AVA LinearSVM Classifier Model", kernel, q, features_to_keep, myava, parameter_dictionary=parameter_state_alphabet_inv, moments_vector=moments_vector ) model_ghmm.save("model_ghmm.dat") model_gnb.save("model_gnb.dat") model_svm.save("model_svm.dat") # HMM classifier with SVN emissions # print "[II] All-vs-All custom HMM-SVM classifier Success Ratio: %.2f " % (sum(predhmmsvmc==parameters_state).astype(float)/len(parameters_state)) # # obsc = MyAVAClassifier() # obsc.fit(features_vector_upsampled.T, parameters_state) # hmm2 = HMMsvm(obsc) # hmm2.fit([features_vector_upsampled.T], [parameters_state]) # hmmsvmc = HMMsvmClassifier() # hmmsvmc.fit(features_vector_upsampled.T, parameters_state) # svmhmm = HMMSVMClassifier(N=6, n_components=max(unique(parameters_state))) # svmhmm.fit(features_vector_upsampled.T, parameters_state) # pred_svmhmm = svmhmm.predict(features_vector_upsampled.T # ) # def predict(self, features, fname_testing = "testingdat.dat', fname_model = "training.model", fname_tags = "classes.tags"): # import subprocess # # subprocess.call(["svmhmm/svm_hmm_classify", 'trainingdat.dat', 'training.model', 'classes.tags']) # hmsvmc = HmSVMClassifier() # print hmsvmc.fit(features_vector_upsampled.T, parameters_state) #plot(mse(states_to_vector(predhmmc3,parameter_state_alphabet_inv).T, #states_to_vector(parameters_state,parameter_state_alphabet_inv).T)) # # print "[II] Classifying using svmhmm..." # # # for l in range(1, len(parameters_state)+1): # svmhmm += "%d qid:1.%d " % (parameters_state[l-1], l) # for f in range(1, features_vector_upsampled.shape[0]+1): # svmhmm += "%d:%.4f " % (f, features_vector_upsampled[f-1,l-1]) # # svmhmm += "#training \n" # # # # # fileout = open("trainingdat.dat", "w") # # fileout.write(svmhmm) # fileout.close() # # import subprocess # # C = len(parameters_state)/3 # # C = 100 # #subprocess.call("svmhmm/svm_hmm_learn -c %d -e 0.5 trainingdat.dat training.model" % len(parameters_state)) # subprocess.call(["svmhmm/svm_hmm_learn",'-c', '%d' % C, '-e', '0.7', 'trainingdat.dat', 'training.model']) # # # # subprocess.call(["svmhmm/svm_hmm_classify", 'trainingdat.dat', 'training.model', 'classes.tags']) # # f = open('classes.tags') # s = f.read() # s = s[2:-2] # parameters_state_y2 = [int(d) for d in s if d!=' '] # f.close() # # plot(parameters_state_y2) # # classif_ratio_svm = 1 - float(sum(parameters_state != parameters_state_y))/len(parameters_state) # classif_ratio_svmhmm = 1- float(sum(parameters_state != parameters_state_y2))/len(parameters_state) # # print "[II] SVM classification ratio: %.2f" % classif_ratio_svm # print "[II] SVM HMM classification ratio: %.2f" % classif_ratio_svmhmm