Mercurial > hg > segmentation
view utils/plotSSM.py @ 19:890cfe424f4a tip
added annotations
author | mitian |
---|---|
date | Fri, 11 Dec 2015 09:47:40 +0000 |
parents | 8b814fe5781d |
children |
line wrap: on
line source
#!/usr/bin/env python # encoding: utf-8 """ plotSSM.py A helper util to plot SSMs from different features. """ import matplotlib # matplotlib.use('Agg') import sys, os, optparse, csv from itertools import combinations from os.path import join, isdir, isfile, abspath, dirname, basename, split, splitext from copy import copy import matplotlib.pyplot as plt import numpy as np from scipy.signal import correlate2d, convolve2d, filtfilt, resample from scipy.stats import mode, kurtosis, skew from scipy.ndimage import zoom from scipy.ndimage.morphology import binary_fill_holes from scipy.ndimage.filters import * from scipy.spatial.distance import squareform, pdist from sklearn.decomposition import PCA from sklearn.mixture import GMM from sklearn.cluster import KMeans from sklearn.preprocessing import normalize from sklearn.metrics.pairwise import pairwise_distances from skimage.transform import hough_line, hough_line_peaks, probabilistic_hough_line from skimage.filter import canny, sobel from skimage.filter.rank import otsu from skimage import data, measure, segmentation, morphology from skimage.morphology import disk from PeakPickerUtil import PeakPicker from SegUtil import getMean, getStd, getDelta, getSSM, enhanceSSM, upSample, normaliseFeature def parse_args(): # define parser op = optparse.OptionParser() # IO options op.add_option('-g', '--gammatonegram-features', action="store", dest="GF", default='/Volumes/c4dm-03/people/mit/features/gammatonegram/qupujicheng/2048', type="str", help="Loading features from.." ) op.add_option('-s', '--spectrogram-features', action="store", dest="SF", default='/Volumes/c4dm-03/people/mit/features/spectrogram/qupujicheng/2048', type="str", help="Loading features from.." ) op.add_option('-t', '--tempogram-features', action="store", dest="TF", default='/Volumes/c4dm-03/people/mit/features/tempogram/qupujicheng/tempo_features_6s', type="str", help="Loading features from.." ) op.add_option('-a', '--annotations', action="store", dest="GT", default='/Volumes/c4dm-03/people/mit/annotation/qupujicheng/lowercase', type="str", help="Loading annotation files from.. ") op.add_option('-o', '--ouput', action="store", dest="OUTPUT", default='/Volumes/c4dm-03/people/mit/segmentation/gammatone/qupujicheng', type="str", help="Write segmentation results to ") op.add_option('-p', '--plot', action="store_true", dest="PLOT", default=False, help="Save plots") op.add_option('-e', '--test-mode', action="store_true", dest="TEST", default=False, help="Test mode") op.add_option('-v', '--verbose-output', action="store_true", dest="VERBOSE", default=False, help="Exported raw detections.") return op.parse_args() options, args = parse_args() class FeatureObj() : __slots__ = ['key', 'audio', 'timestamps', 'gammatone_features', 'tempo_features', 'timbre_features', 'fp_features', 'lpc_features' 'harmonic_features', 'gammatone_ssm', 'fp_ssm', 'tempo_ssm', 'timbre_ssm', 'lpc_ssm', 'harmonic_ssm', 'ssm_timestamps'] class AudioObj(): __slots__ = ['name', 'feature_list', 'gt', 'label', 'gammatone_features', 'tempo_features', 'timbre_features', 'fp_features', 'lpc_features', 'harmonic_features', 'combined_features',\ 'gammatone_ssm', 'tempo_ssm', 'timbre_ssm', 'lpc_ssm', 'harmonic_ssm', 'fp_ssm', 'combined_ssm', 'ssm', 'ssm_timestamps', 'tempo_timestamps'] class EvalObj(): __slots__ = ['TP', 'FP', 'FN', 'P', 'R', 'F', 'AD', 'DA'] class SSMseg(object): '''The main segmentation object''' def __init__(self): self.SampleRate = 44100 self.NqHz = self.SampleRate/2 self.timestamp = [] self.previousSample = 0.0 self.featureWindow = 6.0 self.featureStep = 3.0 self.kernel_size = 64 # Adjust this param according to the feature resolution.pq self.blockSize = 2048 self.stepSize = 1024 '''NOTE: Match the following params with those used for feature extraction!''' '''NOTE: Unlike spectrogram ones, Gammatone features are extracted without an FFT or any overlap. The windowing is done under the purpose of chunking the audio to facilitate the gammatone filtering. Despite of the overlap in the time domain, only the first half after the filtering is returned, resulting in no overlapping effect in the extracted features. To obtain features for overlapped audio input, make the gammatoneLen equal to blockSize and return the whole filter output.''' self.gammatoneLen = 2048 self.gammatoneBandGroups = [0, 16, 34, 50, 64] self.nGammatoneBands = 30 self.histRes = 40 self.lowFreq = 100 self.highFreq = self.SampleRate / 4 '''Settings for extracting tempogram features.''' self.tempoWindow = 6.0 self.bpmBands = [30, 45, 60, 80, 100, 120, 180, 240, 400, 600] '''Peak picking settings''' self.threshold = 30 self.confidence_threshold = 0.5 self.delta_threshold = 0.0 self.backtracking_threshold = 1.9 self.polyfitting_on = True self.medfilter_on = True self.LPfilter_on = True self.whitening_on = False self.aCoeffs = [1.0000, -0.5949, 0.2348] self.bCoeffs = [0.1600, 0.3200, 0.1600] self.cutoff = 0.34 self.medianWin = 7 def getGaussianParams(self, length, featureRate, timeWindow): win_len = round(timeWindow * featureRate) win_len = win_len + (win_len % 2) - 1 # a 50% overlap between windows stepsize = ceil(win_len * 0.5) num_win = int(floor( (length) / stepsize)) gaussian_rate = featureRate / stepsize return stepsize, num_win, win_len, gaussian_rate def GaussianDistance(self, feature, featureRate, timeWindow): stepsize, num_win, win_len, gr = self.getGaussianParams(feature.shape[0], featureRate, timeWindow) print 'stepsize, num_win, feature', stepsize, num_win, feature.shape, featureRate, timeWindow gaussian_list = [] gaussian_timestamps = [] tsi = 0 # f = open('/Users/mitian/Documents/experiments/features.txt','w') # print 'divergence computing..' for num in xrange(num_win): # print num, num * stepsize , (num * stepsize) + win_len gf=GaussianFeature(feature[int(num * stepsize) : int((num * stepsize) + win_len), :],2) # f.write("\n%s" %str(gf)) gaussian_list.append(gf) tsi = int(floor( num * stepsize + 1)) gaussian_timestamps.append(self.timestamp[tsi]) # f.close() # print 'gaussian_list', len(gaussian_list), len(gaussian_timestamps) dm = np.zeros((len(gaussian_list), len(gaussian_list))) for v1, v2 in combinations(gaussian_list, 2): i, j = gaussian_list.index(v1), gaussian_list.index(v2) dm[i, j] = v1.distance(v2) dm[j, i] = v2.distance(v1) # print 'dm[i,j]',dm[i,j] # sio.savemat("/Users/mitian/Documents/experiments/dm-from-segmenter.mat",{"dm":dm}) return dm, gaussian_timestamps def gaussian_kernel(self, size): '''Create a gaussian tapered 45 degrees rotated checkerboard kernel. TODO: Unit testing: Should produce this with kernel size 3: 0.1353 -0.3679 0.1353 0.3679 1.0000 0.3679 0.1353 -0.3679 0.1353 ''' n = float(np.ceil(size / 2.0)) kernel = np.zeros((size,size)) for i in xrange(1,size+1) : for j in xrange(1,size+1) : gauss = np.exp( -4.0 * (np.square( (i-n)/n ) + np.square( (j-n)/n )) ) # gauss = 1 if np.logical_xor( j - n > np.floor((i-n) / 2.0), j - n > np.floor((n-i) / 2.0) ) : kernel[i-1,j-1] = -gauss else: kernel[i-1,j-1] = gauss return kernel def getDiagonalSlice(self, ssm, width): ''' Return a diagonal slice of the ssm given its width, with 45 degrees rotation. Note: requres 45 degrees rotated kernel also.''' w = int(np.floor(width/2.0)) length = len(np.diagonal(ssm)) slice = np.zeros((2*w+1,length)) # print 'diagonal', length, w, slice.shape for i in xrange(-w, w+1) : slice[w+i,:] = np.hstack(( np.zeros(int(np.floor(abs(i)/2.0))), np.diagonal(ssm,i), np.zeros(int(np.ceil(abs(i)/2.0))) )) return slice def getNoveltyCurve(self,dm, kernel_size): '''Return novelty score from distance matrix.''' kernel_size = int(np.floor(kernel_size/2.0)+1) slice = self.getDiagonalSlice(dm, kernel_size) kernel = self.gaussian_kernel(kernel_size) xc = convolve2d(slice,kernel,mode='same') xc[abs(xc)>1e+10]=0.00001 # print 'xc', xc.shape, xc return xc[int(np.floor(xc.shape[0]/2.0)),:] def mergeBlocks(self, SSM, thresh=0.9, size=5): '''Merge consequtive small blocks along the diagonal.''' # found = False # start = 0 # i = 0 # while i < len(SSM): # j = i + 1 # if found: start = i # while(j < len(SSM) and SSM[i, j]): # if (j-i) > size: # found = True # i = j # # print 'start,end', start, i # start = i # else: # found = False # j += 1 # if not found: # print 'start,end', start, i # SSM[start:i, start:i] = 0.9 # i = j idx = 1 while idx < len(SSM): i = 0 # if ((idx-1-i) > 0 and (idx+1+i) < len(SSM)): while ((idx-1-i) > 0 and (idx+1+i) < len(SSM) and SSM[idx-1-i, idx] > 0 and SSM[idx+1+i, idx] > 0): i += 1 if i > size/2: SSM[idx-1-i:min(idx+i,len(SSM)), idx-1-i:min(idx+i,len(SSM))] = 1.0 idx += max(1, i) return SSM def getGMMs(self, feature, segment_boundaries): '''Return GMMs for located segments''' gmm_list = [] gmm_list.append(GmmDistance(feature[: segment_boundaries[0], :], components = 1)) for i in xrange(1, len(segment_boundaries)): gmm_list.append(GmmDistance(feature[segment_boundaries[i-1] : segment_boundaries[i], :], components = 1)) return gmm_list def normaliseFeature(self, feature_array): feature_array = np.array(feature_array) feature_array[np.isnan(feature_array)] = 0.0 feature_array[np.isinf(feature_array)] = 0.0 if len(feature_array.shape) == 1: feature_array = (feature_array - feature_array.min()) / (feature_array.max() - feature_array.min()) else: mins = feature_array.min(axis=1) maxs = feature_array.max(axis=1) feature_array = (feature_array - mins[:, np.newaxis]) / (maxs - mins)[:, np.newaxis] feature_array[np.isnan(feature_array)] = 0.0 return feature_array def upSample(self, feature_array, step): '''Resample downsized tempogram features, tempoWindo should be in accordance with input features''' # print feature_array.shape sampleRate = 44100 stepSize = 1024.0 # step = np.ceil(sampleRate/stepSize/5.0) feature_array = zoom(feature_array, (step,1)) # print 'resampled', feature_array.shape return feature_array def stripeDistance(self, feature_array, feature_len, step, metric='cosine'): '''Return distance matrix calculated for 2d time invariant features.''' size = feature_array.shape[0] / feature_len dm = np.zeros((size, size)) for i in xrange(size): for j in xrange(i, size): dm[i, j] = np.sum(pairwise_distances(feature_array[i*step:(i+1)*step, :], feature_array[j*step:(j+1)*step, :], metric)) dm[j, i] = dm[i, j] # print 'np.nanmax(dm)', np.nanmax(dm) dm[np.isnan(dm)] = np.nanmax(dm) ssm = 1 - (dm - dm.min()) / (dm.max() - dm.min()) np.fill_diagonal(ssm, 1) return ssm def getMean(self, feature, winlen, stepsize): means = [] steps = int((feature.shape[0] - winlen + stepsize) / stepsize) for i in xrange(steps): means.append(np.mean(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) return np.array(means) def getStd(self, feature, winlen, stepsize): std = [] steps = int((feature.shape[0] - winlen + stepsize) / stepsize) for i in xrange(steps): std.append(np.std(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) return np.array(std) def getDelta(self, feature): delta_feature = np.vstack((np.zeros((1, feature.shape[1])), np.diff(feature, axis=0))) return delta_feature def getSkew(self, feature, axis=-1): return skew(feature, axis=axis) def getKurtosis(self, feature, axis=-1): return kurtosis(feature, axis=axis) def getRolloff(self, data, thresh=0.9): nFrames, nChannels, nBands = data.shape rolloff = np.zeros((nFrames, nChannels)) tpower = np.sum(data, axis=-1) freq = np.linspace(0,10,nBands) for i in xrange(nFrames): rolloffE = thresh * tpower[i] for j in xrange(nChannels): temp = 0.0 tempE = 0.0 for band in xrange(nBands): temp += data[i, j, band] if temp > rolloffE[j]: break rolloff[i, j] = freq[band] # rolloff[i, j] = band return rolloff def getCentroid(self, X): nFrames, nChannels, nBands = X.shape centroid = np.zeros((nFrames, nChannels)) freq = np.linspace(0,10,nBands) freq = np.vstack([freq for i in xrange(nChannels)]) for i in xrange(nFrames): centroid[i, :] = np.sum(X[i, :, :] * freq, axis=-1) / (np.sum(X[i, :, :], axis=-1) + 0.0005*np.ones(nChannels)) return centroid def trackDF(self, onset1_index, df2): '''In the second round of detection, remove the known onsets from the DF by tracking from the peak given by the first round to a valley to deminish the recognised peaks on top of which to start new detection.''' for idx in xrange(len(onset1_index)) : remove = True for i in xrange(onset1_index[idx], 1, -1) : if remove : if df2[i] >= df2[i-1] : df2[i] == 0.0 else: remove = False return df2 def getSSM(self, feature_array, metric='cosine', norm='simple'): '''Compute SSM given input feature array. args: norm: ['simple', 'remove_noise'] ''' dm = pairwise_distances(feature_array, metric=metric) dm = np.nan_to_num(dm) if norm == 'simple': ssm = 1 - (dm - np.min(dm)) / (np.max(dm) - np.min(dm)) return ssm def reduceSSM(self, ssm, maxfilter_size = 2, remove_size=50): reduced_ssm = ssm reduced_ssm[reduced_ssm<0.75] = 0 # # reduced_ssm = maximum_filter(reduced_ssm,size=maxfilter_size) # # reduced_ssm = morphology.remove_small_objects(reduced_ssm.astype(bool), min_size=remove_size) local_otsu = otsu(reduced_ssm, disk(5)) local_otsu = (local_otsu.astype(float) - np.min(local_otsu)) / (np.max(local_otsu) - np.min(local_otsu)) reduced_ssm = reduced_ssm - 0.6*local_otsu return reduced_ssm def pairwiseF(self, annotation, detection, tolerance=3.0, combine=1.0): '''Pairwise F measure evaluation of detection rates.''' # print 'detection', detection detection = np.append(detection, annotation[-1]) res = EvalObj() res.TP = 0 # Total number of matched ground truth and experimental data points gt = len(annotation) # Total number of ground truth data points dt = len(detection) # Total number of experimental data points foundIdx = [] D_AD = np.zeros(gt) D_DA = np.zeros(dt) for dtIdx in xrange(dt): # print detection[dtIdx], abs(detection[dtIdx] - annotation) D_DA[dtIdx] = np.min(abs(detection[dtIdx] - annotation)) # D_DA[dtIdx] = min([abs(annot - detection[dtIdx]) for annot in annotation]) for gtIdx in xrange(gt): D_AD[gtIdx] = np.min(abs(annotation[gtIdx] - detection)) # D_AD[gtIdx] = min([abs(det - annotation[gtIdx]) for det in detection]) for dtIdx in xrange(dt): if (annotation[gtIdx] >= detection[dtIdx] - tolerance/2.0) and (annotation[gtIdx] <= detection[dtIdx] + tolerance/2.0): res.TP = res.TP + 1.0 foundIdx.append(gtIdx) foundIdx = list(set(foundIdx)) res.TP = len(foundIdx) res.FP = max(0, dt - res.TP) res.FN = max(0, gt - res.TP) res.AD = np.mean(D_AD) res.DA = np.mean(D_DA) res.P, res.R, res.F = 0.0, 0.0, 0.0 if res.TP == 0: return res res.P = res.TP / float(dt) res.R = res.TP / float(gt) res.F = 2 * res.P * res.R / (res.P + res.R) # return TP3, FP3, FN3, pairwisePrecision3, pairwiseRecall3, pairwiseFValue3, TP05, FP05, FN05, pairwisePrecision05, pairwiseRecall05, pairwiseFValue05 return res def plotDetection(self, ssm, novelty, smoothed_novelty, gt, det, filename): '''Plot performance curve. x axis: distance threshold for feature selection; y axis: f measure''' plt.figure(figsize=(10,16)) gt_plot = gt / gt[-1] * len(novelty) det_plot = det / gt[-1] * len(novelty) gs = gridspec.GridSpec(2, 1, height_ratios=[3,1]) ax0 = plt.subplot(gs[0]) ax1 = plt.subplot(gs[1], sharex=ax0) ax0.imshow(ssm) ax0.vlines(gt_plot, 0, len(ssm), colors ='w', linestyles='solid') ax0.vlines(det_plot, 0, len(ssm), colors='k', linestyles='dashed') ax1.plot(np.linspace(0, len(novelty)-1, len(novelty)), novelty, 'g', np.linspace(0, len(novelty)-1, len(novelty)), smoothed_novelty,'b') y_min, y_max = min([min(novelty), min(smoothed_novelty)]), max([max(novelty), max(smoothed_novelty)]) ax1.vlines(gt_plot, y_min, y_max, colors ='r', linestyles='solid') ax1.vlines(det_plot, y_min, y_max, colors='k', linestyles='dashed') # f, ax = plt.subplots(2, sharex=True) # ax[0].imshow(ssm) # ax[1].plot(np.linspace(0, len(novelty)-1, len(novelty)), novelty) # ax[1].vlines(gt_plot, 0, len(novelty), colors ='r', linestyles='solid') # ax[1].vlines(det_plot, 0, len(novelty), colors='b', linestyles='dashed') # # plt.show() plt.savefig(filename) return None def process(self): '''For the aggregated input features, discard a propertion each time as the pairwise distances within the feature space descending. In the meanwhile evaluate the segmentation result and track the trend of perfomance changing by measuring the feature selection threshold - segmentation f measure curve. ''' peak_picker = PeakPicker() peak_picker.params.alpha = 9.0 # Alpha norm peak_picker.params.delta = self.delta_threshold # Adaptive thresholding delta peak_picker.params.QuadThresh_a = (100 - self.threshold) / 1000.0 peak_picker.params.QuadThresh_b = 0.0 peak_picker.params.QuadThresh_c = (100 - self.threshold) / 1500.0 peak_picker.params.rawSensitivity = 20 peak_picker.params.aCoeffs = self.aCoeffs peak_picker.params.bCoeffs = self.bCoeffs peak_picker.params.preWin = self.medianWin peak_picker.params.postWin = self.medianWin + 1 peak_picker.params.LP_on = self.LPfilter_on peak_picker.params.Medfilt_on = self.medfilter_on peak_picker.params.Polyfit_on = self.polyfitting_on peak_picker.params.isMedianPositive = False # Settings used for feature extraction feature_window_frame = int(self.SampleRate / self.gammatoneLen * self.featureWindow) feature_step_frame = int(0.5 * self.SampleRate / self.gammatoneLen * self.featureStep) aggregation_window, aggregation_step = 100, 50 featureRate = float(self.SampleRate) / self.stepSize audio_files = [x for x in os.listdir(options.GT) if not x.startswith(".") ] audio_files.sort() audio_files = audio_files audio_list = [] gammatone_feature_list = [i for i in os.listdir(options.GF) if not i.startswith('.')] gammatone_feature_list = ['dct', 'pcamean', 'contrast6'] tempo_feature_list = [i for i in os.listdir(options.TF) if not i.startswith('.')] tempo_feature_list = ['ti_percussive_cdsf', 'tir_percussive_cdsf'] timbre_feature_list = ['mfcc_harmonic'] lpc_feature_list = ['lpcc'] harmonic_feature_list = ['chromagram_harmonic'] gammatone_feature_list = [join(options.GF, f) for f in gammatone_feature_list] timbre_feature_list = [join(options.SF, f) for f in timbre_feature_list] # lpc_feature_list = [join(options.SF, f) for f in lpc_feature_list] tempo_feature_list = [join(options.TF, f) for f in tempo_feature_list] harmonic_feature_list = [join(options.SF, f) for f in harmonic_feature_list] fobj_list = [] # For each audio file, load specific features for audio in audio_files: ao = AudioObj() ao.name = splitext(audio)[0] # annotation_file = join(options.GT, ao.name+'.txt') # iso, salami # ao.gt = np.genfromtxt(annotation_file, usecols=0) # ao.label = np.genfromtxt(annotation_file, usecols=1, dtype=str) # annotation_file = join(options.GT, ao.name+'.csv') # qupujicheng # ao.gt = np.genfromtxt(annotation_file, usecols=0, delimiter=',') # ao.label = np.genfromtxt(annotation_file, usecols=1, delimiter=',', dtype=str) annotation_file = join(options.GT, ao.name+'.lab') # beatles ao.gt = np.genfromtxt(annotation_file, usecols=(0,1)) ao.gt = np.unique(np.ndarray.flatten(ao.gt)) ao.label = np.genfromtxt(annotation_file, usecols=2, dtype=str) gammatone_featureset, timbre_featureset, lpc_featureset, tempo_featureset, harmonic_featureset = [], [], [], [], [] for feature in gammatone_feature_list: for f in os.listdir(feature): if f[:f.find('_vamp')]==ao.name: data = np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[:,1:] if len(data) == 0: continue gammatone_featureset.append(data) break if len(gammatone_feature_list) > 1: n_frame = np.min([x.shape[0] for x in gammatone_featureset]) gammatone_featureset = [x[:n_frame,:] for x in gammatone_featureset] ao.gammatone_features = np.hstack((gammatone_featureset)) else: ao.gammatone_features = gammatone_featureset[0] for feature in timbre_feature_list: for f in os.listdir(feature): if f[:f.find('_vamp')]==ao.name: data = np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[:,1:] if len(data) == 0: continue timbre_featureset.append(data) break if len(timbre_feature_list) > 1: n_frame = np.min([x.shape[0] for x in timbre_featureset]) timbre_featureset = [x[:n_frame,:] for x in timbre_featureset] ao.timbre_features = np.hstack((timbre_featureset)) else: ao.timbre_features = timbre_featureset[0] # for feature in lpc_feature_list: # for f in os.listdir(feature): # if f[:f.find('_vamp')]==ao.name: # lpc_featureset.append(np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[:,1:]) # break # if len(lpc_feature_list) > 1: # n_frame = np.min([x.shape[0] for x in lpc_featureset]) # lpc_featureset = [x[:n_frame,:] for x in lpc_featureset] # ao.lpc_features = np.hstack((lpc_featureset)) # else: # ao.lpc_features = lpc_featureset[0] for feature in tempo_feature_list: for f in os.listdir(feature): if f[:f.find('_vamp')]==ao.name: data = np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[1:,1:] if len(data) == 0: continue tempo_featureset.append(data) ao.tempo_timestamps = np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[1:,0] break if len(tempo_feature_list) > 1: n_frame = np.min([x.shape[0] for x in tempo_featureset]) tempo_featureset = [x[:n_frame,:] for x in tempo_featureset] ao.tempo_features = np.hstack((tempo_featureset)) else: ao.tempo_features = tempo_featureset[0] for feature in harmonic_feature_list: for f in os.listdir(feature): if f[:f.find('_vamp')]==ao.name: data = np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[:,1:] if len(data) == 0: continue harmonic_featureset.append(data) break if len(harmonic_feature_list) > 1: n_frame = np.min([x.shape[0] for x in harmonic_featureset]) harmonic_featureset = [x[:n_frame,:] for x in harmonic_featureset] ao.harmonic_features = np.hstack((harmonic_featureset)) else: ao.harmonic_features = harmonic_featureset[0] # Reshape features (downsample) to match tempogram ones step = ao.tempo_features.shape[0] # aggregation_step = (n_frames / (step+1.0)) # Get aggregated features for computing ssm aggregation_window, aggregation_step = 1,1 featureRate = float(self.SampleRate) /self.stepSize pca = PCA(n_components=5) ao.gammatone_features = normaliseFeature(ao.gammatone_features) ao.gammatone_features = resample(ao.gammatone_features, step) ao.gammatone_features[np.isnan(ao.gammatone_features)] = 0.0 ao.gammatone_features[np.isinf(ao.gammatone_features)] = 0.0 ao.timbre_features = normaliseFeature(ao.timbre_features) ao.timbre_features = resample(ao.timbre_features, step) ao.timbre_features[np.isnan(ao.timbre_features)] = 0.0 ao.timbre_features[np.isinf(ao.timbre_features)] = 0.0 # ao.lpc_features = self.normaliseFeature(ao.lpc_features) # ao.lpc_features = resample(ao.lpc_features, step) # ao.lpc_features[np.isnan(ao.lpc_features)] = 0.0 # ao.lpc_features[np.isinf(ao.lpc_features)] = 0.0 ao.harmonic_features = normaliseFeature(ao.harmonic_features) ao.harmonic_features = resample(ao.harmonic_features, step) ao.tempo_features = normaliseFeature(ao.tempo_features) ao.harmonic_features[np.isinf(ao.harmonic_features)] = 0.0 ao.tempo_features[np.isnan(ao.tempo_features)] = 0.0 ao.tempo_features[np.isinf(ao.tempo_features)] = 0.0 ao.gammatone_features = getMean(ao.gammatone_features, winlen=aggregation_window, stepsize=aggregation_step) pca.fit(ao.gammatone_features) ao.gammatone_features = pca.transform(ao.gammatone_features) ao.gammatone_ssm = getSSM(ao.gammatone_features) ao.gammatone_ssm = enhanceSSM(ao.gammatone_ssm) ao.tempo_features = getMean(ao.tempo_features, winlen=aggregation_window, stepsize=aggregation_step) pca.fit(ao.tempo_features) ao.tempo_features = pca.transform(ao.tempo_features) ao.tempo_ssm = getSSM(ao.tempo_features) ao.tempo_ssm = enhanceSSM(ao.tempo_ssm) ao.timbre_features = getMean(ao.timbre_features, winlen=aggregation_window, stepsize=aggregation_step) pca.fit(ao.timbre_features) ao.timbre_features = pca.transform(ao.timbre_features) ao.timbre_ssm = getSSM(ao.timbre_features) ao.timbre_ssm = enhanceSSM(ao.timbre_ssm) # ao.lpc_features = self.getMean(ao.lpc_features, winlen=aggregation_window, stepsize=aggregation_step) # pca.fit(ao.lpc_features) # ao.lpc_features = pca.transform(ao.lpc_features) # ao.lpc_ssm = self.getSSM(ao.lpc_features) ao.harmonic_features = getMean(ao.harmonic_features, winlen=aggregation_window, stepsize=aggregation_step) pca.fit(ao.harmonic_features) ao.harmonic_features = pca.transform(ao.harmonic_features) ao.harmonic_ssm = getSSM(ao.harmonic_features) ao.harmonic_ssm = enhanceSSM(ao.harmonic_ssm) ao.ssm_timestamps = np.array(map(lambda x: ao.tempo_timestamps[aggregation_step*x], np.arange(0, ao.gammatone_ssm.shape[0]))) # Single feature SSMs. # gammatone_ssm = self.reduceSSM(ao.gammatone_ssm) plt.figure(figsize=(10, 10)) plt.vlines(ao.gt / ao.gt[-1] * ao.gammatone_ssm.shape[0], 0, ao.gammatone_ssm.shape[0]) plt.imshow(ao.gammatone_ssm) plt.savefig(join(options.OUTPUT, ao.name+'-enhanced-gammatone.pdf'),format='pdf') plt.close() # tempo_ssm = self.reduceSSM(ao.tempo_ssm) plt.figure(figsize=(10, 10)) plt.vlines(ao.gt / ao.gt[-1] * ao.tempo_ssm.shape[0], 0, ao.tempo_ssm.shape[0]) plt.imshow(ao.tempo_ssm) plt.savefig(join(options.OUTPUT, ao.name+'-enhanced-hpss_tempo.pdf'),format='pdf') plt.close() # timbre_ssm = self.reduceSSM(ao.timbre_ssm) plt.figure(figsize=(10, 10)) plt.vlines(ao.gt / ao.gt[-1] * ao.timbre_ssm.shape[0], 0, ao.timbre_ssm.shape[0]) plt.imshow(ao.timbre_ssm) plt.savefig(join(options.OUTPUT, ao.name+'-enhanced-hpss_mfcc.pdf'),format='pdf') plt.close() # # lpc_ssm = self.reduceSSM(ao.lpc_ssm) # plt.figure(figsize=(10, 10)) # plt.vlines(ao.gt / ao.gt[-1] * ao.lpc_ssm.shape[0], 0, ao.lpc_ssm.shape[0], colors='k') # plt.imshow(ao.lpc_ssm) # plt.savefig(join(options.OUTPUT, 'ssm', ao.name+'-lpcc.pdf'),format='pdf') # plt.close() # np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-lpcc.txt'), np.array(ao.lpc_ssm), delimiter=',') # harmonic_ssm = self.reduceSSM(ao.harmonic_ssm) plt.figure(figsize=(10, 10)) plt.vlines(ao.gt / ao.gt[-1] * ao.harmonic_ssm.shape[0], 0, ao.harmonic_ssm.shape[0]) plt.imshow(ao.harmonic_ssm) plt.savefig(join(options.OUTPUT, ao.name+'-enhanced-hpss_chroma.pdf'),format='pdf') plt.close() if options.VERBOSE: np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-gammatone.txt'), np.array(ao.gammatone_ssm), delimiter=',') np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-tempo.txt'), np.array(ao.tempo_ssm), delimiter=',') # np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-hpss_tempo.txt'), np.array(ao.tempo_ssm), delimiter=',') # np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-hpss_mfcc.txt'), np.array(ao.timbre_ssm), delimiter=',') np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-mfcc.txt'), np.array(ao.timbre_ssm), delimiter=',') # np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-lpc.txt'), np.array(ao.lpc_ssm), delimiter=',') np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-chroma.txt'), np.array(ao.harmonic_ssm), delimiter=',') # np.savetxt(join(options.OUTPUT, 'ssm_data', ao.name+'-hpss_chroma.txt'), np.array(ao.harmonic_ssm), delimiter=',') # audio_list.append(ao) def main(): segmenter = SSMseg() segmenter.process() if __name__ == '__main__': main()