annotate utils/SegProperties.py @ 0:26838b1f560f

initial commit of a segmenter project
author mi tian
date Thu, 02 Apr 2015 18:09:27 +0100
parents
children c11ea9e0357f
rev   line source
mi@0 1 #!/usr/bin/env python
mi@0 2 # encoding: utf-8
mi@0 3 """
mi@0 4 SegProperties.py
mi@0 5
mi@0 6 Created by mi tian on 2015-04-02.
mi@0 7 Copyright (c) 2015 __MyCompanyName__. All rights reserved.
mi@0 8 """
mi@0 9
mi@0 10 import sys
mi@0 11 import os
mi@0 12
mi@0 13 class FeatureGMM(object):
mi@0 14 '''Represent segment candidates using single GMMs and compute pairwise distances.'''
mi@0 15 def getGaussianParams(self, length, featureRate, timeWindow):
mi@0 16
mi@0 17 win_len = round(timeWindow * featureRate)
mi@0 18 win_len = win_len + (win_len % 2) - 1
mi@0 19
mi@0 20 # a 50% overlap between windows
mi@0 21 stepsize = ceil(win_len * 0.5)
mi@0 22 num_win = int(floor( (length) / stepsize))
mi@0 23 gaussian_rate = featureRate / stepsize
mi@0 24
mi@0 25 return stepsize, num_win, win_len, gaussian_rate
mi@0 26
mi@0 27 def GaussianDistance(self, feature, featureRate, timeWindow):
mi@0 28
mi@0 29 stepsize, num_win, win_len, gr = self.getGaussianParams(feature.shape[0], featureRate, timeWindow)
mi@0 30 print 'stepsize, num_win, feature', stepsize, num_win, feature.shape, featureRate, timeWindow
mi@0 31 gaussian_list = []
mi@0 32 gaussian_timestamps = []
mi@0 33 tsi = 0
mi@0 34
mi@0 35 # f = open('/Users/mitian/Documents/experiments/features.txt','w')
mi@0 36 # print 'divergence computing..'
mi@0 37 for num in xrange(num_win):
mi@0 38 # print num, num * stepsize , (num * stepsize) + win_len
mi@0 39 gf=GaussianFeature(feature[int(num * stepsize) : int((num * stepsize) + win_len), :],2)
mi@0 40 # f.write("\n%s" %str(gf))
mi@0 41 gaussian_list.append(gf)
mi@0 42 tsi = int(floor( num * stepsize + 1))
mi@0 43 gaussian_timestamps.append(self.timestamp[tsi])
mi@0 44
mi@0 45 # f.close()
mi@0 46
mi@0 47 # print 'gaussian_list', len(gaussian_list), len(gaussian_timestamps)
mi@0 48 dm = np.zeros((len(gaussian_list), len(gaussian_list)))
mi@0 49
mi@0 50 for v1, v2 in combinations(gaussian_list, 2):
mi@0 51 i, j = gaussian_list.index(v1), gaussian_list.index(v2)
mi@0 52 dm[i, j] = v1.distance(v2)
mi@0 53 dm[j, i] = v2.distance(v1)
mi@0 54 # print 'dm[i,j]',dm[i,j]
mi@0 55 # sio.savemat("/Users/mitian/Documents/experiments/dm-from-segmenter.mat",{"dm":dm})
mi@0 56 return dm, gaussian_timestamps
mi@0 57
mi@0 58 def getGMMs(self, feature, segment_boundaries):
mi@0 59 '''Return GMMs for located segments'''
mi@0 60 gmm_list = []
mi@0 61 gmm_list.append(GmmDistance(feature[: segment_boundaries[0], :], components = 1))
mi@0 62 for i in xrange(1, len(segment_boundaries)):
mi@0 63 gmm_list.append(GmmDistance(feature[segment_boundaries[i-1] : segment_boundaries[i], :], components = 1))
mi@0 64 return gmm_list
mi@0 65
mi@0 66
mi@0 67 class FusedPeakSelection(object):
mi@0 68 '''Peak selection from fusion of individual results.'''
mi@0 69 def getFusedPeaks(self, combined_thresh, individual_thresh, individual_tol, combined_tol, w1=None, w2=None, w3=None, w4=None):
mi@0 70 '''Return a list a peak position and the corresponding confidence.'''
mi@0 71 confidence_array = np.zeros_like(w1)
mi@0 72 conf1 = np.zeros_like(w1)
mi@0 73 len_arr = len(w1)
mi@0 74
mi@0 75 # keep peaks retrieved by single feature if its confidence is above individual_thresh
mi@0 76 w1_keep = np.where(w1>=individual_thresh)[0]
mi@0 77 w2_keep = np.where(w2>=individual_thresh)[0]
mi@0 78 w3_keep = np.where(w3>=individual_thresh)[0]
mi@0 79 w4_keep = np.where(w4>=individual_thresh)[0]
mi@0 80 confidence_array[w1_keep] += w1[w1_keep]
mi@0 81 confidence_array[w2_keep] += w2[w2_keep]
mi@0 82 confidence_array[w3_keep] += w3[w3_keep]
mi@0 83 confidence_array[w4_keep] += w4[w4_keep]
mi@0 84
mi@0 85 confidence_array[confidence_array>1] = 1
mi@0 86
mi@0 87 # deal with peaks picked individual features with high confidence first
mi@0 88 i=0
mi@0 89 while i < len_arr:
mi@0 90 if confidence_array[i] > 0:
mi@0 91 temp = [confidence_array[i]]
mi@0 92 pos = [i]
mi@0 93 i += 1
mi@0 94
mi@0 95 # start searching neighborhood for local maximum
mi@0 96 while (i+individual_tol < len_arr and np.max(confidence_array[i:i+individual_tol]) > 0):
mi@0 97 temp += [confidence_array[i+delta] for delta in xrange(individual_tol) if confidence_array[i+delta]>0]
mi@0 98 pos += [i+delta for delta in xrange(individual_tol) if confidence_array[i+delta]>0]
mi@0 99 i += individual_tol
mi@0 100
mi@0 101 if len(temp) == 1:
mi@0 102 conf1[pos[0]] = temp[0]
mi@0 103 else:
mi@0 104 # p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp)))
mi@0 105 # conf1[p] = 1
mi@0 106 p = int(np.mean(pos))
mi@0 107 conf1[p] = np.mean(temp)
mi@0 108 else:
mi@0 109 i += 1
mi@0 110 conf1[conf1>1] = 1
mi@0 111
mi@0 112 # Process peaks with low confidence but located by multiple features in the same neighborhood
mi@0 113 # conf2 = copy(conf1)
mi@0 114 conf2 = np.zeros_like(conf1)
mi@0 115 weight1, weight2, weight3, weight4 = copy(w1), copy(w2), copy(w3), copy(w4)
mi@0 116 weight1[weight1>individual_thresh] = 0.0
mi@0 117 weight2[weight2>individual_thresh] = 0.0
mi@0 118 weight3[weight3>individual_thresh] = 0.0
mi@0 119 weight4[weight4>individual_thresh] = 0.0
mi@0 120 combined = weight1 + weight2 + weight3 + weight4
mi@0 121 combined = (combined - np.min(combined)) / (np.max(combined) - np.min(combined))
mi@0 122 if combined[0]>0.3: combined[0] = 0.8
mi@0 123
mi@0 124 i = 0
mi@0 125 while i < len_arr:
mi@0 126 if combined[i] > 0:
mi@0 127 temp = [combined[i]]
mi@0 128 pos = [i]
mi@0 129 i += 1
mi@0 130
mi@0 131 # start searching neighborhood for local maximum
mi@0 132 while (i+combined_tol < len_arr and np.max(combined[i:i+combined_tol]) > 0):
mi@0 133 temp += [combined[i+delta] for delta in xrange(combined_tol) if combined[i+delta]>0]
mi@0 134 pos += [i+delta for delta in xrange(combined_tol) if combined[i+delta]>0]
mi@0 135 i += combined_tol
mi@0 136
mi@0 137 if len(temp) == 1:
mi@0 138 conf2[pos[0]] += temp[0]
mi@0 139 else:
mi@0 140 p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp)))
mi@0 141 conf2[p] += np.sum(np.multiply(pos,temp)) / p
mi@0 142 else:
mi@0 143 i += 1
mi@0 144
mi@0 145 conf2[conf2<combined_thresh] = 0
mi@0 146 conf2[conf2>1] = 1
mi@0 147
mi@0 148 combined_conf = conf1 + conf2
mi@0 149 combined_conf[combined_conf>1] = 1
mi@0 150 conf = np.zeros_like(combined_conf)
mi@0 151 # Combine selections from the obove two steps
mi@0 152 i=0
mi@0 153 while i < len_arr:
mi@0 154 if combined_conf[i] > 0.3:
mi@0 155 temp = [combined_conf[i]]
mi@0 156 pos = [i]
mi@0 157 i += 1
mi@0 158
mi@0 159 # start searching neighborhood for local maximum
mi@0 160 while (i+individual_tol < len_arr and np.max(combined_conf[i:i+individual_tol]) > 0.5):
mi@0 161 temp += [combined_conf[i+delta] for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5]
mi@0 162 pos += [i+delta for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5]
mi@0 163 i += individual_tol
mi@0 164
mi@0 165 if len(temp) == 1:
mi@0 166 conf[pos[0]] = combined_conf[pos[0]]
mi@0 167 elif (np.max(temp)== 1 and np.sort(temp)[-2] < combined_thresh):
mi@0 168 p = pos[np.argmax(temp)]
mi@0 169 conf[p] = np.max(temp)
mi@0 170 else:
mi@0 171 p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp)))
mi@0 172 conf[p] = np.mean(np.multiply(pos,temp)) / p
mi@0 173 else:
mi@0 174 i += 1
mi@0 175
mi@0 176 peaks = list(np.where(conf>combined_thresh)[0])
mi@0 177 return peaks, conf1, conf2, conf
mi@0 178
mi@0 179 def getPeakWeights(self, sdf, peak_list):
mi@0 180 '''Compute peak confidence.
mi@0 181 Return: array with confidence values at peak positions and zeros otherwise'''
mi@0 182 mask = np.zeros_like(sdf)
mi@0 183 mask[peak_list] = 1.0
mi@0 184 return sdf * mask
mi@0 185
mi@0 186 def selectPeak(self, peak_candidates, featureset, winlen=5):
mi@0 187 dist_list = []
mi@0 188 feature_types = len(featureset)
mi@0 189 gt_dist, hm_dist, tb_dist, tp_dist = [], [], [], []
mi@0 190
mi@0 191 for idx, x in enumerate(peak_candidates):
mi@0 192 prev_features = tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)])
mi@0 193 post_features = tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)])
mi@0 194 gt_dist.append(np.sum(pairwise_distances(prev_features[0], post_features[0])))
mi@0 195 hm_dist.append(np.sum(pairwise_distances(prev_features[1], post_features[1])))
mi@0 196 tb_dist.append(np.sum(pairwise_distances(prev_features[2], post_features[2])))
mi@0 197 tp_dist.append(np.sum(pairwise_distances(prev_features[3], post_features[3])))
mi@0 198
mi@0 199 return peak_candidates[np.argmax(gt_dist)], peak_candidates[np.argmax(hm_dist)], peak_candidates[np.argmax(tb_dist)], peak_candidates[np.argmax(tp_dist)]
mi@0 200
mi@0 201 def getPeakFeatures(self, peak_candidates, featureset, winlen):
mi@0 202 '''
mi@0 203 args: winlen: length of feature window before and after an investigated peak
mi@0 204 featureset: A list of audio features for measuring the dissimilarity.
mi@0 205
mi@0 206 return: peak_features
mi@0 207 A list of tuples of features for windows before and after each peak.
mi@0 208 '''
mi@0 209 prev_features = []
mi@0 210 post_features = []
mi@0 211 feature_types = len(featureset)
mi@0 212
mi@0 213 # print peak_candidates[-1], winlen, featureset[0].shape
mi@0 214 # if peak_candidates[-1] + winlen > featureset[0].shape[0]:
mi@0 215 # peak_candidates = peak_candidates[:-1]
mi@0 216 # for x in peak_candidates:
mi@0 217 # prev_features.append(tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)]))
mi@0 218 # post_features.append(tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)]))
mi@0 219 prev_features.append(tuple([featureset[i][:peak_candidates[0], :] for i in xrange(feature_types)]))
mi@0 220 post_features.append(tuple([featureset[i][peak_candidates[0]:peak_candidates[1], :] for i in xrange(feature_types)]))
mi@0 221 for idx in xrange(1, len(peak_candidates)-1):
mi@0 222 prev_features.append(tuple([featureset[i][peak_candidates[idx-1]:peak_candidates[idx], :] for i in xrange(feature_types)]))
mi@0 223 post_features.append(tuple([featureset[i][peak_candidates[idx]:peak_candidates[idx+1], :] for i in xrange(feature_types)]))
mi@0 224 prev_features.append(tuple([featureset[i][peak_candidates[-2]:peak_candidates[-1], :] for i in xrange(feature_types)]))
mi@0 225 post_features.append(tuple([featureset[i][peak_candidates[-1]:, :] for i in xrange(feature_types)]))
mi@0 226 return prev_features, post_features
mi@0 227
mi@0 228 def segStats(self, feature_array, boundary_list):
mi@0 229 '''Return some basic stats of features associated with two boundaries.'''
mi@0 230 feature_stats = []
mi@0 231 for i in xrange(1, len(boundary_list)):
mi@0 232 feature_stats.append(np.std(feature_array[boundary_list[i-1]:boundary_list[i]], axis=0))
mi@0 233 return feature_stats
mi@0 234
mi@0 235 def segmentDev(self, prev_features, post_features, metric='kl'):
mi@0 236 '''Deviations are measured for each given feature type.
mi@0 237 peak_candidates: peaks from the 1st round detection
mi@0 238 peak_features: Features for measuring the dissimilarity for parts before and after each peak.
mi@0 239 dtype: tuple.
mi@0 240 '''
mi@0 241 dev_list = []
mi@0 242 n_peaks = len(prev_features)
mi@0 243 n_features = len(prev_features[0])
mi@0 244 # print 'n_peaks, n_features', n_peaks, n_features
mi@0 245 if metric == 'kl':
mi@0 246 for x in xrange(n_peaks):
mi@0 247 f1, f2 = prev_features[x], post_features[x]
mi@0 248 dev_list.append(tuple([GmmDistance(f1[i], components=1).skl_distance_full(GmmDistance(f2[i], components=1)) for i in xrange(n_features)]))
mi@0 249 elif metric == 'euclidean':
mi@0 250 for x in xrange(n_peaks):
mi@0 251 f1, f2 = prev_features[x], post_features[x]
mi@0 252 dev_list.append(tuple([pairwise_distances(f1[i], f2[i]) for i in xrange(n_features)]))
mi@0 253 return dev_list
mi@0 254
mi@0 255 def main():
mi@0 256 pass
mi@0 257
mi@0 258
mi@0 259 if __name__ == '__main__':
mi@0 260 main()
mi@0 261