Mercurial > hg > segmentation
view HSeg.py @ 18:b4bf37f94e92
prepared to add another annotation
author | mitian |
---|---|
date | Wed, 09 Dec 2015 16:27:10 +0000 |
parents | c01fcb752221 |
children |
line wrap: on
line source
#!/usr/bin/env python # encoding: utf-8 """ HSeg.py Created by mi tian on 2015-08-14. Copyright (c) 2015 __MyCompanyName__. All rights reserved. """ import sys, os, optparse, csv from itertools import combinations from os.path import join, isdir, isfile, abspath, dirname, basename, split, splitext from copy import copy import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec import numpy as np import scipy as sp from scipy.signal import correlate2d, convolve2d, filtfilt, resample from scipy.ndimage.filters import * from scipy.ndimage.interpolation import zoom from sklearn.decomposition import PCA from sklearn.mixture import GMM from sklearn.cluster import KMeans from sklearn.preprocessing import normalize from sklearn.metrics.pairwise import pairwise_distances # Load dependencies from utils.SegUtil import getMean, getStd, getDelta, getSSM, reduceSSM, upSample, normaliseFeature, normaliseArray from utils.PeakPickerUtil import PeakPicker from utils.gmmdist import * from utils.GmmMetrics import GmmDistance from utils.RankClustering import rClustering from utils.kmeans import Kmeans # Using the novelty based (Tian) boundary retrieval method import novelty as novelty_S import sf as sf_S import cnmf as cnmf_S # Algorithm params h = 8 # Size of median filter for features in C-NMF R = 15 # Size of the median filter for the activation matrix C-NMF rank = 4 # Rank of decomposition for the boundaries rank_labels = 6 # Rank of decomposition for the labels R_labels = 6 # Size of the median filter for the labels # Foote M = 2 # Median filter for the audio features (in beats) Mg = 32 # Gaussian kernel size L = 16 # Size of the median filter for the adaptive threshold # 2D-FMC N = 8 # Size of the fixed length segments (for 2D-FMC) # Define arg parser def parse_args(): op = optparse.OptionParser() # IO options op.add_option('-f', '--features1', action="store", dest="F1", default='/Users/mitian/Documents/experiments/mit/features/gammatonegram_fft/qupujicheng/2048_1024', type="str", help="Loading features from.." ) op.add_option('-e', '--features2', action="store", dest="F2", default='/Users/mitian/Documents/experiments/mit/features/gammatonegram_fft/qupujicheng/2048_1024', type="str", help="Loading features from.." ) op.add_option('-a', '--annotations', action="store", dest="GT", default='/Volumes/c4dm-03/people/mit/annotation/qupujicheng/lowercase', type="str", help="Loading annotation files from.. ") op.add_option('-d', '--dataset', action="store", dest="DATASET", default='qupujicheng', type="str", help="Specify datasets") op.add_option('-o', '--output', action="store", dest="OUTPUT", default=' /Users/mitian/Documents/experiments/mit/gmm/gammatone_fft/qupujicheng', type="str", help="Loading annotation files from.. ") op.add_option('-c', '--cache', action="store", dest="CACHE", default='/Users/mitian/Documents/experiments/mit/gmm/gammatone_fft/qupujicheng', type="str", help="Saving temporary cache files to.. ") op.add_option('-n', '--name', action="store", dest="NAME", default=None, type="str", help="Save output under the name..") # Plot/print/mode options op.add_option('-b', '--boundary-method', action="store_true", dest="BOUNDARY_ALL", default=False, help="Use all boundary method.") op.add_option('-p', '--plot', action="store_true", dest="PLOT", default=False, help="Save plots") op.add_option('-t', '--test-mode', action="store_true", dest="TEST", default=False, help="Test mode") op.add_option('-v', '--verbose-mode', action="store_true", dest="VERBOSE", default=False, help="Print results in verbose mode.") return op.parse_args() options, args = parse_args() class AudioObj(): __slots__ = ['name', 'feature_list', 'gt', 'label', 'features1', 'features2', 'ssm_timestamps'] class EvalObj(): __slots__ = ['TP', 'FP', 'FN', 'P', 'R', 'F', 'AD', 'DA', 'detection'] class Seg(object): '''The main segmentation object''' def __init__(self): self.SampleRate = 44100 self.NqHz = self.SampleRate/2 self.timestamp = [] self.previousSample = 0.0 self.featureWindow = 6.0 self.featureStep = 3.0 self.kernel_size = 100 # Adjust this param according to the feature resolution.pq self.blockSize = 2048 self.stepSize = 1024 '''NOTE: Match the following params with those used for feature extraction!''' '''NOTE: Unlike spectrogram ones, Gammatone features are extracted without taking an FFT. The windowing is done under the purpose of chunking the audio to facilitate the gammatone filtering with the specified blockSize and stepSize. The resulting gammatonegram is aggregated every gammatoneLen without overlap.''' self.gammatoneLen = 2048 self.gammatoneBandGroups = [0, 2, 6, 10, 13, 17, 20] self.nGammatoneBands = 20 self.lowFreq = 100 self.highFreq = self.SampleRate / 4 '''Settings for extracting tempogram features.''' self.tempoWindow = 6.0 self.bpmBands = [30, 45, 60, 80, 100, 120, 180, 240, 400, 600] '''Peak picking settings for novelty based method''' self.threshold = 30 self.confidence_threshold = 0.5 self.delta_threshold = 0.0 self.backtracking_threshold = 1.9 self.polyfitting_on = True self.medfilter_on = True self.LPfilter_on = True self.whitening_on = False self.aCoeffs = [1.0000, -0.5949, 0.2348] self.bCoeffs = [0.1600, 0.3200, 0.1600] self.cutoff = 0.34 self.medianWin = 7 def pairwiseF(self, annotation, detection, tolerance=3.0, combine=1.0, idx2time=None): '''Pairwise F measure evaluation of detection rates.''' res = EvalObj() res.TP, res.FP, res.FN = 0, 0, 0 res.P, res.R, res.F = 0.0, 0.0, 0.0 res.AD, res.DA = 0.0, 0.0 if len(detection) == 0: return res if idx2time != None: # Map detected idxs to real time detection.sort() if detection[-1] >= len(idx2time): detection = detection[:-len(np.array(detection)[np.array(detection)-len(idx2time)>=0])] detection = [idx2time[int(i)] for i in detection] detection = np.append(detection, annotation[-1]) res.detection = detection gt = len(annotation) # Total number of ground truth data points dt = len(detection) # Total number of experimental data points foundIdx = [] D_AD = np.zeros(gt) D_DA = np.zeros(dt) for dtIdx in xrange(dt): D_DA[dtIdx] = np.min(abs(detection[dtIdx] - annotation)) for gtIdx in xrange(gt): D_AD[gtIdx] = np.min(abs(annotation[gtIdx] - detection)) for dtIdx in xrange(dt): if (annotation[gtIdx] >= detection[dtIdx] - tolerance/2.0) and (annotation[gtIdx] <= detection[dtIdx] + tolerance/2.0): foundIdx.append(gtIdx) continue foundIdx = list(set(foundIdx)) res.TP = len(foundIdx) # res.FP = dt - res.TP res.FP = max(0, dt - res.TP) res.FN = gt - res.TP res.AD = np.mean(D_AD) res.DA = np.mean(D_DA) if res.TP == 0: return res res.P = res.TP / float(res.TP+res.FP) res.R = res.TP / float(res.TP+res.FN) res.F = 2 * res.P * res.R / (res.P + res.R) return res def writeVerifiedHeader(self, filename): '''Write header of output files for verified segmentation.''' with open(filename, 'a') as f: csvwriter = csv.writer(f, delimiter=',') csvwriter.writerow(['audio', 'novelty_05_TP', 'novelty_05_FP', 'novelty_05_FN', 'novelty_05_P', 'novelty_05_R', 'novelty_05_F', 'novelty_05_AD', 'novelty_05_DA', 'novelty_3_TP', 'novelty_3_FP', 'novelty_3_FN', 'novelty_3_P', 'novelty_3_R', 'novelty_3_F', 'novelty_3_AD', 'novelty_3_DA',\ 'verified_novelty_05_TP', 'verified_novelty_05_FP', 'verified_novelty_05_FN', 'verified_novelty_05_P', 'verified_novelty_05_R', 'verified_novelty_05_F', 'verified_novelty_05_AD', 'verified_novelty_05_DA', 'verified_novelty_3_TP', 'verified_novelty_3_FP',\ 'gt_verified_3_FN']) def writeVerifiedRes(self, filename, ao_name, novelty_05, novelty_3, verified_novelty_05, verified_novelty_3): '''Write result of single detection for verified segmentation.''' with open(filename, 'a') as f: csvwriter = csv.writer(f, delimiter=',') csvwriter.writerow([ao_name, novelty_05.TP, novelty_05.FP, novelty_05.FN, novelty_05.P, novelty_05.R, novelty_05.F, novelty_05.AD, novelty_05.DA, novelty_3.TP, novelty_3.FP, novelty_3.FN, novelty_3.P, novelty_3.R,\ novelty_3.F, novelty_3.AD, novelty_3.DA, verified_novelty_05.TP, verified_novelty_05.FP, verified_novelty_05.FN, verified_novelty_05.P, verified_novelty_05.R, verified_novelty_05.F, verified_novelty_05.AD, verified_novelty_05.DA, verified_novelty_3.TP, verified_novelty_3.FP, verified_novelty_3.FN, verified_novelty_3.P, verified_novelty_3.R, verified_novelty_3.F, verified_novelty_3.AD, verified_novelty_3.DA]) def localBoundaries(self, bound_idx, feature, confidence, peak_picker, thresh=0.3, tol=3, metric='novelty'): '''Detect local bounderies within fixed-len window around a boudary from the first round detection. args: bound_idx: index of boundary condidate for local inspection feature: an alternative feature for local pairwise distance measuring confidence: a list of confidence values assigned to all boundary candidates. thresh: threshold for boundary confidence tol: window length (L = 2*tol + 1) for extracting local features (unit=s) metric: 'novelty' (default), 'sf', 'cnmf' ''' local_boundaries = [] smoothed_local_novelty = None tol_win = float(tol) / self.stepSize * self.SampleRate if confidence[bound_idx] < thresh: return local_boundaries, smoothed_local_novelty # print 'bound_idx', bound_idx, len(confidence), feature.shape # If the boundary is of high relative confidence, keep it anyway if (1 < bound_idx < len(confidence)-1): if confidence[bound_idx-1] / thresh <= confidence[bound_idx] and confidence[bound_idx-1] / thresh <= confidence[bound_idx]: local_boundaries.append(bound_idx) pre_win = np.max([int(bound_idx-tol_win), 0]) post_win = np.min([int(bound_idx+tol_win), len(confidence)]) local_feature = feature[pre_win: post_win, :] local_ssm = pairwise_distances(local_feature, metric='cosine') if metric == 'novelty': local_novelty, smoothed_local_novelty, local_idxs = novelty_S.process(local_ssm, peak_picker, 48) elif metric == 'sf': nc, local_idxs = sf_S.segmentation(local_ssm) elif metric == 'cnmf': G, local_idxs = cnmf_S.segmentation(local_ssm) if local_idxs: local_idxs += map(lambda x: x+bound_idx, local_idxs) # print local_idxs return list(set(local_idxs)), smoothed_local_novelty def verifyPeaks(self, bound_idx, second_detection, thresh=0.6, tol = 3, second_detection_conf=None): '''Pruning second round detection. args: bound_idx: index of boundary condidate for local inspection second_detection: a list of peaks detected in the second round near a boudary candidates. thresh: confidence threshold for discarding detection second_detection_conf: a list of confidence values assigned to all local peaks. ''' tol_win = float(tol) / self.stepSize * self.SampleRate # Select peak with the highest local confidence if second_detection_conf: if np.max(second_detection_conf) > thresh: verified = bound_idx + np.argmax(second_detection_conf) - tol_win return verified else: return None # Select peak closest to the 1st round detection elif second_detection: # pos = np.argmin(abs(np.array(second_detection)-bound_idx)) pos = int(np.mean(np.where(abs(np.array(second_detection)-bound_idx) == abs(np.array(second_detection)-bound_idx).min())[0])) verified_peak = second_detection[pos] return verified_peak # No peak is secured around bound_idx in the second round verification else: return None def secondRoundDetection(self, peak_candidates, candidates_conf, feature, peak_verifier=None, tol=3, thresh1=0.4, thresh2=0.5): '''Second round detection.''' peaks = [] tol_win = float(tol) / self.stepSize * self.SampleRate for i, x in enumerate(peak_candidates): # Bypass peak candidates with low confidence if candidates_conf[x] < thresh1: continue # If with high confidence, keep it straight if candidates_conf[x] > 0.7: peaks.append(x) continue # 2nd round detection for questionable peaks pre_win = np.max([int(x-tol_win), 0]) post_win = np.min([int(x+tol_win), len(candidates_conf)]) if pre_win == post_win: continue local_feature = feature[pre_win: post_win, :] local_ssm = pairwise_distances(local_feature, metric='cosine') local_conf, local_peaks = novelty_S.process(local_ssm,peak_verifier,100)[-2:] if len(local_peaks)==0: continue local_conf = normaliseArray(local_conf) # Keep the one detected from the 2nd around with the highest confidence as final peak # if np.max(local_conf[local_peaks]) > thresh2: if local_conf[tol_win] >= thresh2: #v2 local_bound = x - tol_win + np.argmax(local_conf) peaks.append(np.rint(local_bound)) # remove very closely located peaks (duplicates from different rounds of detection) peaks.sort() return peaks def process(self): '''Load precomputed features for all audio samples and make segmentation calls.''' # peak_picker for the 1st round boudary detection peak_picker = PeakPicker() peak_picker.params.alpha = 9.0 # Alpha norm peak_picker.params.delta = self.delta_threshold # Adaptive thresholding delta peak_picker.params.QuadThresh_a = (100 - self.threshold) / 1000.0 peak_picker.params.QuadThresh_b = 0.0 peak_picker.params.QuadThresh_c = (100 - self.threshold) / 1500.0 peak_picker.params.rawSensitivity = 20 peak_picker.params.aCoeffs = self.aCoeffs peak_picker.params.bCoeffs = self.bCoeffs peak_picker.params.preWin = self.medianWin peak_picker.params.postWin = self.medianWin + 1 peak_picker.params.LP_on = self.LPfilter_on peak_picker.params.Medfilt_on = self.medfilter_on peak_picker.params.Polyfit_on = self.polyfitting_on peak_picker.params.isMedianPositive = False # peak_picker for the second round boudary verification (might need lower sensitivity) peak_verifier = PeakPicker() peak_verifier.params.alpha = 9.0 # Alpha norm peak_verifier.params.delta = self.delta_threshold # Adaptive thresholding delta peak_verifier.params.QuadThresh_a = (100 - 20) / 1000.0 peak_verifier.params.QuadThresh_b = 0.0 peak_verifier.params.QuadThresh_c = (100 - 20) / 1500.0 peak_verifier.params.rawSensitivity = 20 peak_verifier.params.aCoeffs = self.aCoeffs peak_verifier.params.bCoeffs = self.bCoeffs peak_verifier.params.preWin = 20 peak_verifier.params.postWin = 20 + 1 peak_verifier.params.LP_on = self.LPfilter_on peak_verifier.params.Medfilt_on = self.medfilter_on peak_verifier.params.Polyfit_on = self.polyfitting_on peak_verifier.params.isMedianPositive = False # Getting aggregated features. featureRate = float(self.SampleRate) / self.stepSize aggregation_window, aggregation_step = 20, 10 tempo_step = 0.204995725 * featureRate audio_files = [x for x in os.listdir(options.GT) if not x.startswith(".") ] audio_files.sort() if options.TEST: audio_files = audio_files[:2] audio_list = [] # Use mfccs feature 1st round segmentation (coarse) feature_list1 =['mfcc_harmonic'] # feature_list1 = ['ti_15_30_4_03_05', 'tir_15_30_4_03_05'] feature_list1 = [join(options.F1, f) for f in feature_list1] # feature_list2 = ['gt_stft_lpc'] feature_list2 = ['chromagram_harmonic'] feature_list2 = [join(options.F2, f) for f in feature_list2] # Prepare output files. outfile1 = join(options.OUTPUT, 'tbhm_verified_novelty_tol1th103th205_07_v2.csv') self.writeVerifiedHeader(outfile1) # For each audio file, load specific features for audio in audio_files: ao = AudioObj() ao.name = splitext(audio)[0] # Load annotations for specified audio collection. if options.DATASET == 'qupujicheng': annotation_file = join(options.GT, ao.name+'.csv') # qupujicheng ao.gt = np.genfromtxt(annotation_file, usecols=0, delimiter=',') ao.label = np.genfromtxt(annotation_file, usecols=1, delimiter=',', dtype=str) elif options.DATASET == 'salami': annotation_file = join(options.GT, ao.name+'.txt') # iso, salami ao.gt = np.genfromtxt(annotation_file, usecols=0) ao.label = np.genfromtxt(annotation_file, usecols=1, dtype=str) else: annotation_file = join(options.GT, ao.name+'.lab') # beatles ao.gt = np.genfromtxt(annotation_file, usecols=(0,1)) ao.gt = np.unique(np.ndarray.flatten(ao.gt)) ao.label = np.genfromtxt(annotation_file, usecols=2, dtype=str) featureset1, featureset2 = [], [] # Features for 1st round segmentation for feature in feature_list1: for f in os.listdir(feature): if f[:f.find('_vamp')]==ao.name: ao.ssm_timestamps = np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[1:,0] featureset1.append(np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[1:,1:14]) break if len(feature_list1) > 1: n_frame = np.min([x.shape[0] for x in featureset1]) featureset1 = [x[:n_frame,:] for x in featureset1] ao.features1 = np.hstack((featureset1)) else: ao.features1 = featureset1[0] # ao.ssm_timestamps = np.arange(0, ao.gt[-1], float(self.stepSize)/self.SampleRate) # Features for 2nd round verification for feature in feature_list2: for f in os.listdir(feature): if f[:f.find('_vamp')]==ao.name: featureset2.append(np.genfromtxt(join(feature, f), delimiter=',',filling_values=0.0)[:,1:14]) break if len(feature_list2) > 1: n_frame = np.min([x.shape[0] for x in featureset2]) featureset2 = [x[:n_frame,:] for x in featureset2] ao.features2 = np.hstack((featureset2)) else: ao.features2 = featureset2[0] pca = PCA(n_components=6) # Normalise features. # Note that tempogram features are extracted from a different stepsize as the others. step = ao.features1.shape[0] / aggregation_step ao.features1 = resample(ao.features1, step) ao.features1 = normaliseFeature(ao.features1) ao.features2 = normaliseFeature(ao.features2) pca.fit(ao.features1) ao.features1 = pca.transform(ao.features1) pca.fit(ao.features2) ao.features2 = pca.transform(ao.features2) ssm1 = getSSM(ao.features1) ssm1_timestamps = ao.ssm_timestamps[::aggregation_step] # Take care with this! It gains memory pressure when processing large dataset. # audio_list.append(ao) # Segment the music at a coarse scale # Within the neighborhood of each peak candidate, verify the boundary location at a finer scale. novelty, smoothed_novelty, novelty_idxs = novelty_S.process(ssm1, peak_picker, self.kernel_size) novelty_05 = self.pairwiseF(ao.gt, novelty_idxs, tolerance=0.5, combine=1.0, idx2time=ssm1_timestamps) novelty_3 = self.pairwiseF(ao.gt, novelty_idxs, tolerance=3, combine=1.0, idx2time=ssm1_timestamps) # Verification using different features at a finer scale. # Map to the orginal time scale peak_candidates = np.array(map(lambda x: int(np.rint(x*aggregation_step)), novelty_idxs)) # peak_candidates = np.array(map(lambda x: int((np.rint(x*tempo_step))), novelty_idxs)) peak_conf = normaliseArray(smoothed_novelty) if options.VERBOSE: np.savetxt(join(options.CACHE, ao.name+'-sn-raw.txt'), np.array(zip(ssm1_timestamps, peak_conf)), delimiter=',') peak_conf = zoom(peak_conf, aggregation_step) # peak_conf = zoom(peak_conf, tempo_step) peak_candidates = peak_candidates[:len(peak_conf)] verified_novelty_idxs = self.secondRoundDetection(peak_candidates, peak_conf, ao.features2, peak_verifier, tol=1, thresh1=0.3, thresh2=0.5) verified_novelty_idxs = list(set(verified_novelty_idxs)) verified_novelty_05 = self.pairwiseF(ao.gt, verified_novelty_idxs, tolerance=0.5, combine=1.0, idx2time=ao.ssm_timestamps) verified_novelty_3 = self.pairwiseF(ao.gt, verified_novelty_idxs, tolerance=3, combine=1.0, idx2time=ao.ssm_timestamps) # Write results. self.writeVerifiedRes(outfile1, ao.name, novelty_05, novelty_3, verified_novelty_05, verified_novelty_3) if options.VERBOSE: print ao.name, novelty_3.TP, novelty_3.FP, novelty_3.FN, novelty_3.P, novelty_3.R, novelty_3.F, verified_novelty_3.TP, verified_novelty_3.FP, verified_novelty_3.FN, verified_novelty_3.P, verified_novelty_3.R, verified_novelty_3.F np.savetxt(join(options.CACHE, ao.name+'-raw.txt'), novelty_3.detection, delimiter=',') np.savetxt(join(options.CACHE, ao.name+'-verified.txt'), verified_novelty_3.detection, delimiter=',') def main(): segmenter = Seg() segmenter.process() if __name__ == '__main__': main()