Mercurial > hg > chourdakisreiss2016
view experiment-reverb/code/supervised_training_hmms_higher_order.py @ 2:c87a9505f294 tip
Added LICENSE for code, removed .wav files
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Sat, 30 Sep 2017 13:25:50 +0100 |
parents | 246d5546657c |
children |
line wrap: on
line source
#!/usr/bin/python2 # -*- coding: utf-8 -*- """ Created on Thu Apr 23 11:53:17 2015 @author: mmxgn """ # This file does the cluster estimation and the removal of outliers from sys import argv, exit from essentia.standard import YamlInput, YamlOutput from essentia import Pool from pca import * from numpy import * from sklearn import cluster from sklearn.metrics import pairwise_distances from sklearn.cluster import KMeans, MiniBatchKMeans from matplotlib.pyplot import * #from sklearn.mixture import GMM from sklearn.naive_bayes import GaussianNB, MultinomialNB from scipy.signal import decimate from sklearn import cross_validation #from hmmlearn import hmm from hmmlearn.hmm import GMM from hmmlearn import hmm from sklearn import svm #from adpcm import adm, adm_reconstruct mse = lambda A,B: ((array(A)-array(B)) ** 2).mean() def smooth_matrix_1D(X): window = scipy.signal.gaussian(51,4) window = window/sum(window) intermx = zeros((X.shape[0],X.shape[1]+100)) intermx[:, 50:-50] = X for m in range(0, X.shape[0]): # print intermx.shape intermx[m,:] = convolve(ravel(intermx[m,:]), window,'same') return intermx[:,50:-50] def adm_reconstruct(codeword, h, dmin=.01, dmax=.28): x = zeros((1, codeword.shape[1])) delta1 = dmin delta2 = dmin Sum = h x[0] = h for i in range(0, codeword.shape[1]): if codeword[0,i] == 0: delta1 = dmin delta2 = dmin elif codeword[0,i] == 1: delta2 = dmin Sum += delta1 delta1 *= 2 if delta1 > dmax: delta1 = dmax elif codeword[0,i] == -1: delta1 = dmin Sum -= delta2 delta2 *= 2 if delta2 > dmax: delta2 = dmax x[0,i] = Sum return x def adm(x, dmin=.01, dmax=.28, tol=0.0001): # Adaptive delta modulation adapted by code: # (adeltamod.m) # # Adaptive Delta Modulator # by Gandhar Desai (gdesai) # BITS Pilani Goa Campus # Date: 28 Sept, 2013 xsig = x Lx = len(x) ADMout = zeros((1, Lx)) codevec = zeros((1, Lx)) Sum = x[0] delta1 = dmin delta2 = dmin mult1 = 2 mult2 = 2 for i in range(0, Lx): #print abs(xsig[i] - Sum) if (abs(xsig[i] - Sum) < tol): bit = 0 delta2 = dmin delta1 = dmin elif (xsig[i] >= Sum): bit = 1 delta2 = dmin Sum += delta1 delta1 *= mult1 if delta1 > dmax: delta1 = dmax else: bit = -1 delta1 = dmin Sum -= delta2 delta2 *= mult2 if delta2 > dmax: delta2 = dmax ADMout[0, i] = Sum codevec[0, i]= bit return ADMout,codevec, x[0] if __name__=="__main__": if len(argv) != 2: print "[EE] Wrong number of arguments" print "[II] Correct syntax is:" print "[II] \t%s <training_file>" print "[II] where <training_file> is a .yaml file containing the" print "[II] features of the dataset (try output2_stage/fulltraining-last.yaml)" exit(-1) n_clusters = 4 UpsamplingFactor = 1 dmin = 0.001 dmax = 0.28 tol = 0.001 infile = argv[1] features_pool = YamlInput(filename = infile)() feature_captions = features_pool.descriptorNames() parameter_captions = [] for c in features_pool.descriptorNames(): if c.split('.')[0] == 'parameter': parameter_captions.append(c) if c.split('.')[0] == 'metadata' or c.split('.')[0] == 'parameter': feature_captions.remove(c) # close('all') print "[II] Loaded training data from %s (%s) " % (infile, features_pool['metadata.date'][0]) print "[II] %d Features Available: " % len(feature_captions) print str(feature_captions).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] nfeatures_in = len(feature_captions) nparameters_in = len(parameter_captions) features_vector = zeros((nfeatures_in, len(features_pool[feature_captions[0]]))) parameters_vector = zeros((nparameters_in, len(features_pool[parameter_captions[0]]))) for i in range(0, nfeatures_in): features_vector[i, :] = features_pool[feature_captions[i]].T for i in range(0, nparameters_in): parameters_vector[i, :] = features_pool[parameter_captions[0]].T print "[II] %d parameters used:" % len(parameter_captions) print str(parameter_captions).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7].replace('parameter.','') print "[II] Marking silent parts" silent_parts = zeros((1, len(features_pool[feature_captions[i]].T))) rms = features_vector[feature_captions.index('rms'), :] # Implementing Hysteresis Gate -- High threshold is halfway between # the mean and the max and Low is halfway between the mean dn the min rms_threshold_mean = mean(rms) rms_threshold_max = max(rms) rms_threshold_min = min(rms) rms_threshold_high = 0.1 * rms_threshold_mean rms_threshold_low = 0.01 * rms_threshold_mean for n in range(1, len(rms)): prev = rms[n-1] curr = rms[n] if prev >= rms_threshold_high: if curr < rms_threshold_low: silent_parts[0,n] = 1 else: silent_parts[0,n] = 0 elif prev <= rms_threshold_low: if curr > rms_threshold_high: silent_parts[0,n] = 0 else: silent_parts[0,n] = 1 else: silent_parts[0,n] = silent_parts[0,n-1] if silent_parts[0,1] == 1: silent_parts[0, 0] = 1 active_index = invert(silent_parts.flatten().astype(bool)) # Keep only active parts # Uncomment this features_vector = features_vector[:, active_index] moments_vector = zeros((features_vector.shape[0], 2)) print "[II] Storing moments vector" for i in range(0, features_vector.shape[0]): mean_ = mean(features_vector[i,:]) std_ = std(features_vector[i,:], ddof=1) moments_vector[i,0] = mean_ moments_vector[i,1] = std_ features_vector[i,:] = (features_vector[i,:] - mean_)/std_ features_vector_original = features_vector print "[II] Extracting PCA configuration " kernel, q, featurelist = extract_pca_configuration_from_data(features_vector) print "[II] Optimal number of PCs to keep: %d" % q feature_captions_array = array(feature_captions) features_to_keep = list(feature_captions_array[featurelist]) print "[II] Decided to keep %d features:" % len(features_to_keep) print str(features_to_keep).replace("', ","\n").replace('[','').replace("'","[II]\t ")[:-7] features_kept_data = features_vector[featurelist,:] features_vector = (kernel.T*features_kept_data)[0:q,:] parameters_k_means = KMeans(init='k-means++', n_init=10, max_iter=300, tol=0.0000001, verbose = 0) print "[II] Trying ADM-coded parameters" print "[II] Upsampling features and parameters by a factor of %d" % UpsamplingFactor # Upsampled features and parameters features_vector_upsampled = smooth_matrix_1D(repeat(features_vector,UpsamplingFactor, axis=1)) # feature_labels_upsampled = repeat(features_clustering_labels,UpsamplingFactor, axis=0) parameters_vector_upsampled = repeat(parameters_vector,UpsamplingFactor, axis=1) # parameters_vector_upsampled = smooth_matrix_1D(parameters_vector_upsampled) parameters_vector_upsampled_adm = matrix(zeros(shape(parameters_vector_upsampled))) parameters_vector_upsampled_code = matrix(zeros(shape(parameters_vector_upsampled))) parameters_vector_upsampled_firstval = matrix(zeros((parameters_vector_upsampled.shape[0],1))) # Reconstructed parameters parameters_vector_upsampled_reconstructed = matrix(zeros(shape(parameters_vector_upsampled))) def adm_matrix(X, dmin=0.001,dmax=0.28,tol=0.001): out = matrix(zeros(shape(X))) code = matrix(zeros(shape(X))) firstval = matrix(zeros((X.shape[0], 1))) for i in range(0, X.shape[0]): out[i,:], code[i,:], firstval[i,0] = adm(X[i,:],dmin=dmin,dmax=dmax,tol=tol) return out,code,firstval # parameters_vector_upsampled_reconstructed[i,:] = adm_reconstruct(parameters_vector_upsampled_code[i,:],parameters_vector_upsampled_firstval[i,0], dmin=dmin,dmax=dmax) def adm_matrix_reconstruct(code, firstval, dmin=0.001, dmax=0.28): X = matrix(zeros(shape(code))) for i in range(0, code.shape[0]): X[i,:] = adm_reconstruct(code[i,:], firstval[i,0], dmin=dmin, dmax=dmax) return X parameters_vector_upsampled_adm, parameters_vector_upsampled_code, parameters_vector_upsampled_firstval = adm_matrix(parameters_vector_upsampled, dmin, dmax, tol) def diff_and_pad(X): return concatenate(( zeros(( shape(X)[0], 1 )), diff(X, axis=1)), axis=1) print "[II] Clustering features." # features_clustering = GMM(n_components = n_clusters, covariance_type='diag') # features_clustering.fit( features_vector_upsampled.T, y=parameters_vector_upsampled_code) # features_clustering_means = features_clustering.means_ features_clustering_labels = features_clustering.predict(features_vector_upsampled.T) features_clustering_sigmas = features_clustering.covars_ # features_vector_upsampled_estimated = zeros(shape(features_vector_upsampled)) # # for n in range(0, len(features_vector_upsampled_estimated[0])): features_vector_upsampled_estimated[:,n] = features_clustering_means[features_clustering_labels[n]] # # print "[II] Features MSE for %d clusters: %.3f" % (n_clusters, mse(features_vector_upsampled, features_vector_upsampled_estimated)) def happy_curve_classification(data, classes, estimator, Nd=1): print "[II] Generating Happy Curve " from copy import deepcopy estimator_fulldata = deepcopy(estimator) estimator_fulldata.fit(data, classes) labels = estimator.predict(data) # 1. Split data in two, training and testing Ntr = int(round(data.shape[0]/2)) # Training dataset size Nts = data.shape[0] - Ntr # Testing dataset size ratios = [] for n in range(Nd, Ntr): train, test, trainl, testl = cross_validation.train_test_split(data, classes, test_size = 0.5, random_state = 0) train = train[0:n,:] trainl = trainl[0:n] # print "trainl" # print trainl estimator_ = deepcopy(estimator) estimator_.fit(train,trainl) labels = estimator_.predict(test) ratio = sum(array(testl==labels).astype(float))/len(labels) ratios.append(ratio) return ratios def cross_validate_classification(data, classes, estimator): print "[II] Crossvalidating... " from copy import deepcopy estimator_fulldata = deepcopy(estimator) estimator_fulldata.fit(data, classes) percents = arange(0.1,0.9,0.1) MSEs = [] labels = estimator.predict(data) print "[II] for full training-testing: %.2f" % (sum(array(classes==labels).astype(float))/len(labels)) for p in percents: train,test,trainlabels,testlabels = cross_validation.train_test_split(data,classes,test_size=p,random_state=0) estimator_ = deepcopy(estimator) estimator_.fit(train, trainlabels) labels = estimator_.predict(test) print "[II] for training(%.2f)-testing(%.2f): %.2f" % ((1-p),p,sum(array(testlabels==labels).astype(float))/len(labels)) return MSEs def cross_validate_clustering(data, estimator): print "[II] Crossvalidating... " estimator_fulldata = estimator estimator_fulldata.fit(data) # labels = estimator_fulldata.predict(data) means = estimator_fulldata.means_ # print means percents = arange(0.1,0.6,0.1) MSEs = [] reconstructed = zeros(shape(data)) labels = estimator.predict(data) for n in range(0, len(reconstructed)): reconstructed[n,:] = means[labels[n]] MSEs.append(mse(data,reconstructed)) for p in percents: train,test = cross_validation.train_test_split(data,test_size=p,random_state=0) train = matrix(train) test = matrix(test) estimator.fit(train) means = estimator.means_ labels = estimator.predict(test) reconstructed = zeros(shape(test)) for n in range(0, len(reconstructed)): reconstructed[n,:] = means[labels[n]] m = mse(test,reconstructed) print "[II] MSE for clustering crossvalidated data %.2f-%.2f: %.5f" % ((1-p), p, m) MSEs.append(m) print "[II] Crossvalidation complete" return MSEs # Construct parameters alphabet, each symbol is going to be a different column vector # in parameter code matrix def vector_to_states(X): """ Input: a vector MxN with N samples and M variables Output: a codeword dictionary `parameters_alphabet', state_seq, inverse `parameters_alphabet_inv' """ parameters_alphabet = {} n = 0 for i in range(0, X.shape[1]): vec = tuple(ravel(X[:,i])) if vec not in parameters_alphabet: parameters_alphabet[vec] = n n += 1 parameters_alphabet_inv = dict([(parameters_alphabet[m],m) for m in parameters_alphabet]) state_seq = array([parameters_alphabet[tuple(ravel(X[:,m]))] for m in range(0, parameters_vector_upsampled_code.shape[1])] ) return state_seq, parameters_alphabet, parameters_alphabet_inv def states_to_vector(predicted, parameters_alphabet_inv): estimated = matrix(zeros((len(parameters_alphabet_inv[0]), len(predicted)))) for i in range(0, len(state_seq)): estimated[:, i] = matrix(parameters_alphabet_inv[predicted[i]]).T return estimated state_seq, parameters_alphabet, parameters_alphabet_inv = vector_to_states(parameters_vector_upsampled_code) parameters_change_variable = matrix(diff_and_pad(parameters_vector_upsampled)!=0).astype(int) changes_state_seq, changes_parameters_alphabet, changes_parameters_alphabet_inv = vector_to_states(parameters_change_variable) # This is an hmm that just codes the changes" # We have only two states, change and stay the same. # Uncomment that here # parameters_vector_upsampled = parameters_vector_upsampled_code # parameters_state_orig, parameter_state_alphabet_orig, parameter_state_alphabet_inv_orig = vector_to_states(parameters_vector) parameters_state, parameter_state_alphabet, parameter_state_alphabet_inv = vector_to_states(parameters_vector_upsampled) print "[II] Testing Gaussian Naive Bayes Classifier" gnb = GaussianNB() gnb.fit(features_vector_upsampled.T, parameters_state) parameters_state_estimated = gnb.predict(features_vector_upsampled.T) output = states_to_vector(parameters_state_estimated, parameter_state_alphabet_inv) figure() subplot(211) plot(parameters_vector_upsampled.T) title('Parameter value upsampled by a factor of %d' % UpsamplingFactor) ylabel('value') xlabel('frame #') subplot(212) #plot(smooth_matrix_1D(output).T) plot(output.T) ylabel('value') xlabel('frame #') #cross_validate_classification(features_vector_upsampled.T, parameters_state, gnb) # hc = happy_curve_classification(features_vector_upsampled.T, parameters_state, gnb) # # figure() # plot(hc) # figure() print "[II] Trying Multinomial HMM" # In order to do classification with HMMs, we need to: # 1. Split the parameters into classes # 2. Train one model per class # 3. Feed our data to all the models # 4. Check which has a better score and assig,n to M class HmmClassifier: def __init__(self, N=2, n_components = 1): self.n_components = n_components self.chain_size = N self.hmms_ = [] self.N = N def fit(self, X, states): self.n_states = len(unique(states)) for n in range(0, self.n_states): hmm_ = hmm.GaussianHMM(n_components = self.n_components, covariance_type = 'full') # Get training data for each class vector = X[states == n,:] # Fit the HMM # print vector hmm_.fit([vector]) # And append to the list self.hmms_.append(hmm_) def predict(self,X): labels = zeros((X.shape[0],)) N = self.N m = 0 scores = zeros((1, self.n_states)) while m*N < X.shape[0]: if (m+1)*N > X.shape[0]: testdata = X[m*N:,:] else: testdata = X[m*N:(m+1)*N,:] # print testdata for i in range(0, self.n_states): scores[0,i] = self.hmms_[i].score(testdata) if (m+1)*N > X.shape[0]: labels[m*N:] = argmax(scores) else: labels[m*N:(m+1)*N] = argmax(scores) m+= 1 return labels N = 2 n_components = 1 hmmc = HmmClassifier(N = N, n_components = n_components) hmmc.fit(features_vector_upsampled.T, parameters_state) # hchmmc = happy_curve_classification(features_vector_upsampled.T, parameters_state, hmmc, Nd=100) # cross_validate_classification(features_vector_upsampled.T, parameters_state, hmmc) # hmms_ = [] # # for n in range(0, len(parameter_state_alphabet)): # #hmm_ = hmm.GMMHMM(n_components = 1, n_mix = 2) # hmm_ = hmm.GaussianHMM(n_components = 1,covariance_type = 'full') # # # Get training data for each class # vector = features_vector_upsampled[:,parameters_state == n] # # #if vector.shape[1] < n_clusters: # # hmms_.append(None) # #else: # # hmm_.fit([vector.T]) # # # Append to the list # # hmms_.append(hmm_) # # labels = zeros((features_vector_upsampled.shape[1],)) # # N = 20 # m = 0 # # while m*N < features_vector_upsampled.shape[1]: # # scores = zeros((1, len(parameter_state_alphabet))) # # if (m+1)*N > features_vector_upsampled.shape[1]: # testdata = features_vector_upsampled[:,m*N:] # else: # testdata = features_vector_upsampled[:,m*N:(m+1)*N] # # for i in range(0, len(parameter_state_alphabet)): # if hmms_[i] is not None: # scores[0,i] = hmms_[i].score(testdata.T) # else: # scores[0,i] = -100000 # Very large negative score # if (m+1)*N >= features_vector_upsampled.shape[1]: # labels[m*N:] = argmax(scores) # else: # labels[m*N:(m+1)*N] = argmax(scores) # # m += 1 # figure() #plot(labels.T) labels = hmmc.predict(features_vector_upsampled.T) estimated = states_to_vector(labels,parameter_state_alphabet_inv) plot(estimated.T,'r--') title('Estimated parameter values') legend(['Naive Bayes Classifier', 'HMM chain size %d (%.1fms)' % (N, float(N)/UpsamplingFactor*23.0)]) ylabel('value') xlabel('frame #') close('all') plot(features_clustering_labels/float(max(features_clustering_labels))) plot(parameters_vector_upsampled.T/max(ravel(parameters_vector_upsampled))) def plot_clusters(x, labels): figure() symbols = ['>', 'x', '.', '<','v'] colors = ['b', 'r', 'g', 'm','c'] for r in range(0, x.shape[0]): scatter(x[r,0], x[r,1], c=colors[int(labels[r]) % len(colors)], marker=symbols[int(labels[r]) % len(symbols)]) plot_clusters(features_vector_upsampled.T, parameters_state) # SVM X = features_vector_upsampled.T y = parameters_state clf = svm.SVC() clf.fit(X,y) parameters_state_y = clf.predict(X) #plot(parameters)