mi@0: #!/usr/bin/env python mi@0: # encoding: utf-8 mi@0: """ mi@0: SegProperties.py mi@0: mi@0: Created by mi tian on 2015-04-02. mi@0: Copyright (c) 2015 __MyCompanyName__. All rights reserved. mi@0: """ mi@0: mitian@1: import sys, os mitian@1: import numpy as np mitian@1: from sklearn.metrics.pairwise import pairwise_distances mitian@1: from gmmdist import * mitian@1: from GmmMetrics import GmmDistance mi@0: mi@0: class FeatureGMM(object): mi@0: '''Represent segment candidates using single GMMs and compute pairwise distances.''' mi@0: def getGaussianParams(self, length, featureRate, timeWindow): mi@0: mi@0: win_len = round(timeWindow * featureRate) mi@0: win_len = win_len + (win_len % 2) - 1 mi@0: mi@0: # a 50% overlap between windows mi@0: stepsize = ceil(win_len * 0.5) mi@0: num_win = int(floor( (length) / stepsize)) mi@0: gaussian_rate = featureRate / stepsize mi@0: mi@0: return stepsize, num_win, win_len, gaussian_rate mi@0: mi@0: def GaussianDistance(self, feature, featureRate, timeWindow): mi@0: mi@0: stepsize, num_win, win_len, gr = self.getGaussianParams(feature.shape[0], featureRate, timeWindow) mi@0: print 'stepsize, num_win, feature', stepsize, num_win, feature.shape, featureRate, timeWindow mi@0: gaussian_list = [] mi@0: gaussian_timestamps = [] mi@0: tsi = 0 mi@0: mi@0: # f = open('/Users/mitian/Documents/experiments/features.txt','w') mi@0: # print 'divergence computing..' mi@0: for num in xrange(num_win): mi@0: # print num, num * stepsize , (num * stepsize) + win_len mi@0: gf=GaussianFeature(feature[int(num * stepsize) : int((num * stepsize) + win_len), :],2) mi@0: # f.write("\n%s" %str(gf)) mi@0: gaussian_list.append(gf) mitian@1: tsi = int(np.floor( num * stepsize + 1)) mi@0: gaussian_timestamps.append(self.timestamp[tsi]) mi@0: mi@0: # f.close() mi@0: mi@0: # print 'gaussian_list', len(gaussian_list), len(gaussian_timestamps) mi@0: dm = np.zeros((len(gaussian_list), len(gaussian_list))) mi@0: mi@0: for v1, v2 in combinations(gaussian_list, 2): mi@0: i, j = gaussian_list.index(v1), gaussian_list.index(v2) mi@0: dm[i, j] = v1.distance(v2) mi@0: dm[j, i] = v2.distance(v1) mi@0: # print 'dm[i,j]',dm[i,j] mi@0: # sio.savemat("/Users/mitian/Documents/experiments/dm-from-segmenter.mat",{"dm":dm}) mi@0: return dm, gaussian_timestamps mi@0: mi@0: def getGMMs(self, feature, segment_boundaries): mi@0: '''Return GMMs for located segments''' mi@0: gmm_list = [] mi@0: gmm_list.append(GmmDistance(feature[: segment_boundaries[0], :], components = 1)) mi@0: for i in xrange(1, len(segment_boundaries)): mi@0: gmm_list.append(GmmDistance(feature[segment_boundaries[i-1] : segment_boundaries[i], :], components = 1)) mi@0: return gmm_list mi@0: mi@0: mi@0: class FusedPeakSelection(object): mi@0: '''Peak selection from fusion of individual results.''' mi@0: def getFusedPeaks(self, combined_thresh, individual_thresh, individual_tol, combined_tol, w1=None, w2=None, w3=None, w4=None): mi@0: '''Return a list a peak position and the corresponding confidence.''' mi@0: confidence_array = np.zeros_like(w1) mi@0: conf1 = np.zeros_like(w1) mi@0: len_arr = len(w1) mi@0: mi@0: # keep peaks retrieved by single feature if its confidence is above individual_thresh mi@0: w1_keep = np.where(w1>=individual_thresh)[0] mi@0: w2_keep = np.where(w2>=individual_thresh)[0] mi@0: w3_keep = np.where(w3>=individual_thresh)[0] mi@0: w4_keep = np.where(w4>=individual_thresh)[0] mi@0: confidence_array[w1_keep] += w1[w1_keep] mi@0: confidence_array[w2_keep] += w2[w2_keep] mi@0: confidence_array[w3_keep] += w3[w3_keep] mi@0: confidence_array[w4_keep] += w4[w4_keep] mi@0: mi@0: confidence_array[confidence_array>1] = 1 mi@0: mi@0: # deal with peaks picked individual features with high confidence first mi@0: i=0 mi@0: while i < len_arr: mi@0: if confidence_array[i] > 0: mi@0: temp = [confidence_array[i]] mi@0: pos = [i] mi@0: i += 1 mi@0: mi@0: # start searching neighborhood for local maximum mi@0: while (i+individual_tol < len_arr and np.max(confidence_array[i:i+individual_tol]) > 0): mi@0: temp += [confidence_array[i+delta] for delta in xrange(individual_tol) if confidence_array[i+delta]>0] mi@0: pos += [i+delta for delta in xrange(individual_tol) if confidence_array[i+delta]>0] mi@0: i += individual_tol mi@0: mi@0: if len(temp) == 1: mi@0: conf1[pos[0]] = temp[0] mi@0: else: mi@0: # p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp))) mi@0: # conf1[p] = 1 mi@0: p = int(np.mean(pos)) mi@0: conf1[p] = np.mean(temp) mi@0: else: mi@0: i += 1 mi@0: conf1[conf1>1] = 1 mi@0: mi@0: # Process peaks with low confidence but located by multiple features in the same neighborhood mi@0: # conf2 = copy(conf1) mi@0: conf2 = np.zeros_like(conf1) mi@0: weight1, weight2, weight3, weight4 = copy(w1), copy(w2), copy(w3), copy(w4) mi@0: weight1[weight1>individual_thresh] = 0.0 mi@0: weight2[weight2>individual_thresh] = 0.0 mi@0: weight3[weight3>individual_thresh] = 0.0 mi@0: weight4[weight4>individual_thresh] = 0.0 mi@0: combined = weight1 + weight2 + weight3 + weight4 mi@0: combined = (combined - np.min(combined)) / (np.max(combined) - np.min(combined)) mi@0: if combined[0]>0.3: combined[0] = 0.8 mi@0: mi@0: i = 0 mi@0: while i < len_arr: mi@0: if combined[i] > 0: mi@0: temp = [combined[i]] mi@0: pos = [i] mi@0: i += 1 mi@0: mi@0: # start searching neighborhood for local maximum mi@0: while (i+combined_tol < len_arr and np.max(combined[i:i+combined_tol]) > 0): mi@0: temp += [combined[i+delta] for delta in xrange(combined_tol) if combined[i+delta]>0] mi@0: pos += [i+delta for delta in xrange(combined_tol) if combined[i+delta]>0] mi@0: i += combined_tol mi@0: mi@0: if len(temp) == 1: mi@0: conf2[pos[0]] += temp[0] mi@0: else: mi@0: p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp))) mi@0: conf2[p] += np.sum(np.multiply(pos,temp)) / p mi@0: else: mi@0: i += 1 mi@0: mi@0: conf2[conf21] = 1 mi@0: mi@0: combined_conf = conf1 + conf2 mi@0: combined_conf[combined_conf>1] = 1 mi@0: conf = np.zeros_like(combined_conf) mi@0: # Combine selections from the obove two steps mi@0: i=0 mi@0: while i < len_arr: mi@0: if combined_conf[i] > 0.3: mi@0: temp = [combined_conf[i]] mi@0: pos = [i] mi@0: i += 1 mi@0: mi@0: # start searching neighborhood for local maximum mi@0: while (i+individual_tol < len_arr and np.max(combined_conf[i:i+individual_tol]) > 0.5): mi@0: temp += [combined_conf[i+delta] for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5] mi@0: pos += [i+delta for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5] mi@0: i += individual_tol mi@0: mi@0: if len(temp) == 1: mi@0: conf[pos[0]] = combined_conf[pos[0]] mi@0: elif (np.max(temp)== 1 and np.sort(temp)[-2] < combined_thresh): mi@0: p = pos[np.argmax(temp)] mi@0: conf[p] = np.max(temp) mi@0: else: mi@0: p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp))) mi@0: conf[p] = np.mean(np.multiply(pos,temp)) / p mi@0: else: mi@0: i += 1 mi@0: mi@0: peaks = list(np.where(conf>combined_thresh)[0]) mi@0: return peaks, conf1, conf2, conf mi@0: mi@0: def getPeakWeights(self, sdf, peak_list): mi@0: '''Compute peak confidence. mi@0: Return: array with confidence values at peak positions and zeros otherwise''' mi@0: mask = np.zeros_like(sdf) mi@0: mask[peak_list] = 1.0 mi@0: return sdf * mask mi@0: mi@0: def selectPeak(self, peak_candidates, featureset, winlen=5): mi@0: dist_list = [] mi@0: feature_types = len(featureset) mi@0: gt_dist, hm_dist, tb_dist, tp_dist = [], [], [], [] mi@0: mi@0: for idx, x in enumerate(peak_candidates): mi@0: prev_features = tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)]) mi@0: post_features = tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)]) mi@0: gt_dist.append(np.sum(pairwise_distances(prev_features[0], post_features[0]))) mi@0: hm_dist.append(np.sum(pairwise_distances(prev_features[1], post_features[1]))) mi@0: tb_dist.append(np.sum(pairwise_distances(prev_features[2], post_features[2]))) mi@0: tp_dist.append(np.sum(pairwise_distances(prev_features[3], post_features[3]))) mi@0: mi@0: return peak_candidates[np.argmax(gt_dist)], peak_candidates[np.argmax(hm_dist)], peak_candidates[np.argmax(tb_dist)], peak_candidates[np.argmax(tp_dist)] mi@0: mi@0: def getPeakFeatures(self, peak_candidates, featureset, winlen): mi@0: ''' mi@0: args: winlen: length of feature window before and after an investigated peak mi@0: featureset: A list of audio features for measuring the dissimilarity. mi@0: mi@0: return: peak_features mi@0: A list of tuples of features for windows before and after each peak. mi@0: ''' mi@0: prev_features = [] mi@0: post_features = [] mi@0: feature_types = len(featureset) mi@0: mi@0: # print peak_candidates[-1], winlen, featureset[0].shape mi@0: # if peak_candidates[-1] + winlen > featureset[0].shape[0]: mi@0: # peak_candidates = peak_candidates[:-1] mi@0: # for x in peak_candidates: mi@0: # prev_features.append(tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)])) mi@0: # post_features.append(tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)])) mi@0: prev_features.append(tuple([featureset[i][:peak_candidates[0], :] for i in xrange(feature_types)])) mi@0: post_features.append(tuple([featureset[i][peak_candidates[0]:peak_candidates[1], :] for i in xrange(feature_types)])) mi@0: for idx in xrange(1, len(peak_candidates)-1): mi@0: prev_features.append(tuple([featureset[i][peak_candidates[idx-1]:peak_candidates[idx], :] for i in xrange(feature_types)])) mi@0: post_features.append(tuple([featureset[i][peak_candidates[idx]:peak_candidates[idx+1], :] for i in xrange(feature_types)])) mi@0: prev_features.append(tuple([featureset[i][peak_candidates[-2]:peak_candidates[-1], :] for i in xrange(feature_types)])) mi@0: post_features.append(tuple([featureset[i][peak_candidates[-1]:, :] for i in xrange(feature_types)])) mi@0: return prev_features, post_features mi@0: mi@0: def segStats(self, feature_array, boundary_list): mi@0: '''Return some basic stats of features associated with two boundaries.''' mi@0: feature_stats = [] mi@0: for i in xrange(1, len(boundary_list)): mi@0: feature_stats.append(np.std(feature_array[boundary_list[i-1]:boundary_list[i]], axis=0)) mi@0: return feature_stats mi@0: mi@0: def segmentDev(self, prev_features, post_features, metric='kl'): mi@0: '''Deviations are measured for each given feature type. mi@0: peak_candidates: peaks from the 1st round detection mi@0: peak_features: Features for measuring the dissimilarity for parts before and after each peak. mi@0: dtype: tuple. mi@0: ''' mi@0: dev_list = [] mi@0: n_peaks = len(prev_features) mi@0: n_features = len(prev_features[0]) mi@0: # print 'n_peaks, n_features', n_peaks, n_features mi@0: if metric == 'kl': mi@0: for x in xrange(n_peaks): mi@0: f1, f2 = prev_features[x], post_features[x] mi@0: dev_list.append(tuple([GmmDistance(f1[i], components=1).skl_distance_full(GmmDistance(f2[i], components=1)) for i in xrange(n_features)])) mi@0: elif metric == 'euclidean': mi@0: for x in xrange(n_peaks): mi@0: f1, f2 = prev_features[x], post_features[x] mi@0: dev_list.append(tuple([pairwise_distances(f1[i], f2[i]) for i in xrange(n_features)])) mi@0: return dev_list mi@0: mi@0: def main(): mi@0: pass mi@0: mi@0: mi@0: if __name__ == '__main__': mi@0: main() mi@0: